All Projects → bredikhin → phoenix-postgresql-notify-listen-example

bredikhin / phoenix-postgresql-notify-listen-example

Licence: other
Publish/subscribe with PostgreSQL and Phoenix Framework

Programming Languages

elixir
2628 projects
elm
856 projects
CSS
56736 projects
javascript
184084 projects - #8 most used programming language
HTML
75241 projects

Projects that are alternatives of or similar to phoenix-postgresql-notify-listen-example

phoenix-liveview-15m.twitter
Based on the "real-time Twitter clone in 15 minutes with LiveView and Phoenix", from Chris McCord
Stars: ✭ 40 (+150%)
Mutual labels:  phoenix-framework
cloud-pubsub
Google Cloud PubSub client in rust
Stars: ✭ 27 (+68.75%)
Mutual labels:  pubsub
ucx chat
UcxUcc is a simple but powerful team collaboration suite of applications designed to improve communications, information sharing and productivity for the businesses small and large.
Stars: ✭ 54 (+237.5%)
Mutual labels:  phoenix-framework
go-msg
Pub/Sub Message Primitives for Go
Stars: ✭ 44 (+175%)
Mutual labels:  pubsub
pyTwitchAPI
A Python 3.7 implementation of the Twitch API, EventSub and PubSub
Stars: ✭ 132 (+725%)
Mutual labels:  pubsub
google-cloud
A collection of Google Cloud Platform (GCP) plugins
Stars: ✭ 34 (+112.5%)
Mutual labels:  pubsub
distillery packager
Elixir lib for creating Debian and RPM packages with Distillery
Stars: ✭ 25 (+56.25%)
Mutual labels:  phoenix-framework
blog
An example Phoenix 1.3 application
Stars: ✭ 14 (-12.5%)
Mutual labels:  phoenix-framework
phoenix live view tablefilter
Table Filter with Phoenix LiveView
Stars: ✭ 15 (-6.25%)
Mutual labels:  phoenix-framework
ws.q
Simple library for websockets in kdb+/q
Stars: ✭ 27 (+68.75%)
Mutual labels:  pubsub
coophub
Cooperatives repos over the world! 🌈🌎
Stars: ✭ 53 (+231.25%)
Mutual labels:  phoenix-framework
tutorials as code
so that stuff read/seen don't get muddled up with time
Stars: ✭ 42 (+162.5%)
Mutual labels:  phoenix-framework
pg-pubsub
Reliable PostgreSQL LISTEN/NOTIFY with inter-process lock support
Stars: ✭ 50 (+212.5%)
Mutual labels:  pubsub
dry-events
Pub/sub system
Stars: ✭ 102 (+537.5%)
Mutual labels:  pubsub
ipfs-chat
Real-time P2P messenger using go-ipfs pubsub. TUI. End-to-end encrypted texting & file-sharing. NAT traversal.
Stars: ✭ 84 (+425%)
Mutual labels:  pubsub
workers-pubsub
Google Pub/Sub API for Cloudflare Workers (and Node.js)
Stars: ✭ 25 (+56.25%)
Mutual labels:  pubsub
one-pub-sub-lwc
One PubSub: A Declarative PubSub Library for Lightning Web Component and Aura Component
Stars: ✭ 19 (+18.75%)
Mutual labels:  pubsub
pg-ipc
IPC over PostgreSQL LISTEN/NOTIFY/UNLISTEN exposed as an EventEmitter
Stars: ✭ 27 (+68.75%)
Mutual labels:  pubsub
phoenix-rethinkdb-elm-webpack-example
Rephink: A real-time stack based on Phoenix Framework, RethinkDB, Elm, Webpack
Stars: ✭ 16 (+0%)
Mutual labels:  phoenix-framework
java-phoenix-channel
Phoenix Channel Java Client written in Kotlin
Stars: ✭ 20 (+25%)
Mutual labels:  phoenix-framework

Publish/subscribe with PostgreSQL and Phoenix Framework

Introduction

In our previous experiment we looked at a way to build a naturally scalable real-time application using Phoenix Framework, RethinkDB and Elm. Turned out RethinkDB makes going real-time quite simple and relatively painless. However, an interesting question occurred as a follow-up to that article: can we do the same with PostgreSQL? The short answer is yes, we can. But is it that simple? Well, let's see it for ourselves.

Fast-forward

Since we have already described the motivation behind these experiments with the real-time applications based on Elixir, Phoenix and Elm, as well as the setup process, in the original article (which you should check for more in-depth instructions), we'll just go quickly over the trivial parts of the setup here to reach the point where we can discuss something new:

  • create a new Phoenix project (we will be using v1.3 of the Phoenix Framework, the last one at the moment of this writing): mix phx.new pgsub && cd pgsub;
  • initialize Git repository: git init;
  • get the dependencies: mix deps.get;
  • make sure that PostgreSQL server is running: download your type of package from here or just follow one of the guides if you haven't installed it yet;
  • create the database: mix ecto.create (it will compile your application for the first time as well);
  • generate the repo / schema / migration: mix phx.gen.schema Pgsub.Todo todos task:string completed:boolean;
  • migrate: mix ecto.migrate;
  • remove Brunch.js: rm assets/brunch-config.js;
  • add Webpack with loaders / plugins: curl https://raw.githubusercontent.com/bredikhin/phoenix-rethinkdb-elm-webpack-example/master/assets/package.json > assets/package.json;
  • install npm dependencies: cd assets && npm i && cd ..;
  • configure Webpack: curl https://raw.githubusercontent.com/bredikhin/phoenix-rethinkdb-elm-webpack-example/master/assets/webpack.config.js > assets/webpack.config.js;
  • edit config/dev.exs, replace the watchers line with the following: watchers: [npm: ["run", "watch", cd: Path.expand("../assets", __DIR__)]];
  • add the frontend Elm app: git remote add example [email protected]:bredikhin/phoenix-rethinkdb-elm-webpack-example.git && git fetch example && git checkout example/master elm;
  • get Elm dependencies: cd elm && elm package install -y && cd ..;
  • switch the CSS file: git checkout example/master assets/css/app.css;
  • clean up the page template in lib/pgsub_web/templates/layout/app.html.eex:
...
  <body>
    <script src="<%= static_path(@conn, "/js/app.js") %>"></script>
  </body>
...
  • initialize the Elm application in assets/js/app.js:
// Elm application
let Elm = require('../../elm/Todo.elm')
let todomvc = Elm.Todo.fullscreen()
  • create a channel: mix phx.gen.channel Todo;
  • add your channel to the socket handler in lib/pgsub_web/channels/user_socket.ex: channel "todo:*", PgsubWeb.TodoChannel.

Ecto and channel broadcasting

Now that we are done with the common part of the setup, let's see how to handle messages from our client on the server side. Let's replace the content of lib/pgsub_web/channels/todo_channel.ex with the following:

defmodule PgsubWeb.TodoChannel do
  use PgsubWeb, :channel
  alias Pgsub.Pgsub.Todo
  alias Pgsub.Repo

  def join("todo:list", payload, socket) do
    if authorized?(payload) do
      {:ok, socket}
    else
      {:error, %{reason: "unauthorized"}}
    end
  end

  def handle_in("todos", _payload, socket) do
    broadcast_all_to!(socket)

    {:noreply, socket}
  end

  def handle_in("insert", %{"todo" => data}, socket) do
    %Todo{}
    |> Todo.changeset(data)
    |> Repo.insert!

    broadcast_all_to!(socket)

    {:noreply, socket}
  end

  def handle_in("update", %{"todo" => data}, socket) do
    Todo
    |> Repo.get(data["id"])
    |> Todo.changeset(data)
    |> Repo.update!

    broadcast_all_to!(socket)

    {:noreply, socket}
  end

  def handle_in("delete", %{"todo" => data}, socket) do
    Todo
    |> Repo.get(data["id"])
    |> Repo.delete!

    broadcast_all_to!(socket)

    {:noreply, socket}
  end

  defp authorized?(_payload) do
    true
  end

  defp broadcast_all_to!(socket) do
    todos = Todo |> Repo.all
    PgsubWWeb.Endpoint.broadcast!(socket.topic, "todos", %{todos: todos})
  end
end

We also need to add the following encoder implementation to lib/pgsub/pgsub/todo.ex:

defimpl Poison.Encoder, for: Pgsub.Pgsub.Todo do
  def encode(%{__struct__: _} = struct, options) do
    map = struct
          |> Map.from_struct
          |> Map.drop([:__meta__, :__struct__])
    Poison.Encoder.Map.encode(map, options)
  end
end

This piece, essentially, strips our Todo structure from the meta fields (__meta__, __struct__) to help Poison encode it properly, so we could send it over the wire.

It's all looking slightly different from the RethinkDB example, but the good news is that thanks to the power of Ecto the code we just wrote will work with a great number of database engines having Ecto adapters. Isn't it amazing?

What's good as well is that if you start your Phoenix server based on this channel code and open http://localhost:4000/ in two browser tabs, you'll see that the changes you make in one tab get the other one instantly updated. So, does it mean we have reached our initial goal?

Well, not exactly, since these real-time updates are based on the fact that we have a single Phoenix server acting as a hub for all our changes and having all the clients listening to the same topic. Obviously, this will not work, for example, once we get multiple users subscribed to their own topics and (partially) common data, or if we get some data changes coming from a different application, etc.

But we can fix it easily. And here's where the database real-time tools come into play. In this case, we'll be leveraging PostgreSQL's publish-subscribe features.

LISTEN / NOTIFY

PostgreSQL, among its other features, has a built-in publish-subscribe functionality in the form of NOTIFY, LISTEN (and, well, UNLISTEN) commands. Since you can easily read about each of those in the official documentation, let's just dive in and continue with our example, uncovering the details about those commands as we go.

First, in order to get notified about some specific changes in the database (described by a trigger), let's create a trigger handler in PostgreSQL. Connect to your database (which would be named pgsub_dev by default) with some kind of a query tool (e.g. psql, you'd have to start it with something like psql -d pgsub_dev -w, or you can use some kind of GUI as well, pgAdmin, for example). Run the following:

CREATE OR REPLACE FUNCTION notify_todos_changes()
RETURNS trigger AS $$
DECLARE
  current_row RECORD;
BEGIN
  IF (TG_OP = 'INSERT' OR TG_OP = 'UPDATE') THEN
    current_row := NEW;
  ELSE
    current_row := OLD;
  END IF;
  PERFORM pg_notify(
    'todos_changes',
    json_build_object(
      'table', TG_TABLE_NAME,
      'type', TG_OP,
      'id', current_row.id,
      'data', row_to_json(current_row)
    )::text
  );
  RETURN current_row;
END;
$$ LANGUAGE plpgsql;

What is happening here is that we are building a JSON-encoded modification report and are sending it as a notification using pg_notify function which also takes a channel name (where "a channel" is just a way of separating the notifications, not related to Phoenix channels), todos_changes in our case. Note that depending on the SQL command which triggered the notification, we either use the modified (NEW) row data in case of INSERT / UPDATE or the original (OLD) one in case of DELETE.

Next, let's add the trigger itself:

CREATE TRIGGER notify_todos_changes_trg
AFTER INSERT OR UPDATE OR DELETE
ON todos
FOR EACH ROW
EXECUTE PROCEDURE notify_todos_changes();

Here we're asking Postgres to run our previously created notify_todos_changes handler whenever any INSERT, UPDATE or DELETE on todos table is performed.

And that is it, that's all the setup you need to have on the database side. You can even try it out via psql and make sure it works: start your Phoenix server, perform some updates via your application and run LISTEN todos_changes;. You should see notifications coming in right away.

Handling Postgres notifications within your Phoenix application

Now that the database setup has been taken care of, the only thing that's left to do is to handle those notifications coming from PostgreSQL on the Phoenix side.

Let's start with creating our notification handling module in lib/pgsub/notifications.ex:

defmodule Pgsub.Notifications do
  use GenServer
  alias Pgsub.{Pgsub.Todo, Repo}

  import Poison, only: [decode!: 1]

  def start_link(channel) do
    GenServer.start_link(__MODULE__, channel)
  end

  def init(channel) do
    {:ok, pid} = Application.get_env(:pgsub, Pgsub.Repo)
      |> Postgrex.Notifications.start_link()
    ref = Postgrex.Notifications.listen!(pid, channel)

    data = Todo |> Repo.all

    {:ok, {pid, ref, channel, data}}
  end

  @topic "todo:list"

  def handle_info({:notification, pid, ref, "todos_changes", payload}, {pid, ref, channel, data}) do
    %{
      "data" => raw,
      "id" => id,
      "table" => "todos",
      "type" => type
    } = decode!(payload)
    row = for {k, v} <- raw, into: %{}, do: {String.to_atom(k), v}
    updated_data = case type do
      "UPDATE" -> Enum.map(data, fn x -> if x.id === id do Map.merge(x, row) else x end end)
      "INSERT" -> data ++ [struct(Todo, row)]
      "DELETE" -> Enum.filter(data,  &(&1.id !== id))
    end

    PgsubWeb.Endpoint.broadcast!(@topic, "todos", %{todos: updated_data})

    {:noreply, {pid, ref, channel, updated_data}}
  end
end

Note that the module itself is just a GenServer holding all the records in its state, updating them whenever it gets a notification from the database and broadcasting the updated data. Also, channel here is, once again, a Postgres notification channel, not related to Phoenix channels we're using to communicate between Elm and our server. Finally, don't forget to add a corresponding worker to the main supervision tree in lib/pgsub/application.ex:

worker(Pgsub.Notifications, ["todos_changes"], id: :todos_changes),

Essentially, that is it. If we start our Phoenix server now, we should be getting real-time updates to our application whenever the content of the database table changes. However, some of those may be hitting our Elm frontend twice, since in our channel module we still have some code that reads and broadcasts current list of entries every time it gets updated via the application itself. Let's remove it (putting the complete listing here for sake of simplicity):

defmodule PgsubWeb.TodoChannel do
  use PgsubWeb, :channel
  alias Pgsub.{Pgsub.Todo, Repo}

  def join("todo:list", payload, socket) do
    if authorized?(payload) do
      {:ok, socket}
    else
      {:error, %{reason: "unauthorized"}}
    end
  end

  def handle_in("todos", _payload, socket) do
    todos = Todo |> Repo.all
    PgsubWeb.Endpoint.broadcast!(socket.topic, "todos", %{todos: todos})

    {:noreply, socket}
  end

  def handle_in("insert", %{"todo" => data}, socket) do
    %Todo{}
    |> Todo.changeset(data)
    |> Repo.insert!

    {:noreply, socket}
  end

  def handle_in("update", %{"todo" => data}, socket) do
    Todo
    |> Repo.get(data["id"])
    |> Todo.changeset(data)
    |> Repo.update!

    {:noreply, socket}
  end

  def handle_in("delete", %{"todo" => data}, socket) do
    Todo
    |> Repo.get(data["id"])
    |> Repo.delete!

    {:noreply, socket}
  end

  defp authorized?(_payload) do
    true
  end
end

Conclusion: RethinkDB vs PostgreSQL for real-time applications

So, what we managed to do here is to implement the same real-time example functionality with PostgreSQL as we had previously implemented using RethinkDB. Does it mean these two databases are completely interchangeable when it comes to building real-time applications? It obviously does not. Then which one should we use over another? Given the fact that our example is very basic and no benchmarking whatsoever is provided, I just can't advise you for or against any of these two. Let's, however, look at the facts:

  • PostgreSQL and RethinkDB are really different in its core: the first is a rather traditional relational database, the second is a NoSQL one, and there are plenty of valid reasons NoSQL / dynamic schema databases exist; some people can say a good RDBMS like Postgres can easily be used instead of a NoSQL one in most of use cases, others can argue that "schemaless" databases are the future of data storage, but the truth still is that we shouldn't run to extremes, since every task requires choosing appropriate database engine based on its specifics, and there's no universal solution here;
  • in terms of development, the approaches to the real-time functionality these two databases take are somewhat different: LISTEN / NOTIFY / TRIGGER mechanism is more low-level, whereas changefeeds give you in a certain sense more flexibility while designing and developing your application;
  • finally, yes, Postgres is mature and reliable, it has a solid production record and is backed by an experienced community with an impressive list of sponsors, but maybe the fact that RethinkDB is relatively young is not such a bad thing: with Linux Foundation behind it, let's give it a few years, and I'm sure it will be totally capable to compete against Postgres on some levels.

Anyway, even though the above example is heavily simplified and the description lacks any metrics and / or benchmarks, I hope that I at least got you interested, that you had fun and possibly learned something new. And, as usually, constructive feedback is welcome.

Credits

License

The MIT License

Copyright (c) 2017 Ruslan Bredikhin

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].