All Projects → supabase → walrus

supabase / walrus

Licence: Apache-2.0 License
Applying RLS to PostgreSQL WAL

Programming Languages

python
139335 projects - #7 most used programming language
PLpgSQL
1095 projects

Projects that are alternatives of or similar to walrus

pg2k4j
Postgresql To Kinesis For Java
Stars: ✭ 69 (+16.95%)
Mutual labels:  replication, cdc
Repmgr
A lightweight replication manager for PostgreSQL (Postgres) - latest version 5.2.1 (2020-12-07)
Stars: ✭ 1,207 (+1945.76%)
Mutual labels:  postgres, replication
Postgresql cluster
PostgreSQL High-Availability Cluster (based on "Patroni" and "DCS(etcd)"). Automating deployment with Ansible.
Stars: ✭ 294 (+398.31%)
Mutual labels:  postgres, replication
Pg chameleon
MySQL to PostgreSQL replica system
Stars: ✭ 274 (+364.41%)
Mutual labels:  postgres, replication
Realtime
Listen to your to PostgreSQL database in realtime via websockets. Built with Elixir.
Stars: ✭ 4,278 (+7150.85%)
Mutual labels:  postgres, cdc
Wal E
Continuous Archiving for Postgres
Stars: ✭ 3,313 (+5515.25%)
Mutual labels:  postgres, replication
Wal2json
JSON output plugin for changeset extraction
Stars: ✭ 705 (+1094.92%)
Mutual labels:  postgres, replication
Testgres
Testing framework for PostgreSQL and its extensions
Stars: ✭ 85 (+44.07%)
Mutual labels:  postgres, replication
Amazonriver
amazonriver 是一个将postgresql的实时数据同步到es或kafka的服务
Stars: ✭ 198 (+235.59%)
Mutual labels:  postgres, replication
Tunnel
PG数据同步工具(Java实现)
Stars: ✭ 122 (+106.78%)
Mutual labels:  postgres, replication
redis-connect-dist
Real-Time Event Streaming & Change Data Capture
Stars: ✭ 21 (-64.41%)
Mutual labels:  replication, cdc
pgcapture
A scalable Netflix DBLog implementation for PostgreSQL
Stars: ✭ 94 (+59.32%)
Mutual labels:  replication, cdc
MySqlCdc
MySQL/MariaDB binlog replication client for .NET
Stars: ✭ 71 (+20.34%)
Mutual labels:  replication, cdc
fostgres
RESTful APIs for Postgres databases
Stars: ✭ 22 (-62.71%)
Mutual labels:  postgres
eventide-postgres
Event Sourcing and Microservices Stack for Ruby
Stars: ✭ 92 (+55.93%)
Mutual labels:  postgres
SQLDBA-SSMS-Solution
This respository contains TSQL/PowerShell Scripts to resolve issues of SQL Servers
Stars: ✭ 21 (-64.41%)
Mutual labels:  replication
subsocial-offchain
Off-chain storage for Subsocial blockchain. This app builds user feeds and notifications by subscribing to Substrate events.
Stars: ✭ 24 (-59.32%)
Mutual labels:  postgres
lighthouse
Easy clojure relational database queries, migrations and connection pooling
Stars: ✭ 19 (-67.8%)
Mutual labels:  postgres
debezium-incubator
Previously used repository for new Debezium modules and connectors in incubation phase (archived)
Stars: ✭ 89 (+50.85%)
Mutual labels:  cdc
OpenLogReplicator
Open Source Oracle database CDC written purely in C++. Reads transactions directly from database redo log files and streams in JSON or Protobuf format to: Kafka, RocketMQ, flat file, network stream (plain TCP/IP or ZeroMQ)
Stars: ✭ 112 (+89.83%)
Mutual labels:  cdc

walrus

PostgreSQL version License


Source Code: https://github.com/supabase/walrus


Write Ahead Log Realtime Unified Security (WALRUS) is a utility for managing realtime subscriptions to tables and applying row level security rules to those subscriptions.

The subscription stream is based on logical replication slots.

Summary

Managing Subscriptions

User subscriptions are managed through a table

create table realtime.subscription (
    id bigint generated always as identity primary key,
    subscription_id uuid not null,
    entity regclass not null,
    filters realtime.user_defined_filter[] not null default '{}',
    claims jsonb not null,
    claims_role regrole not null generated always as (realtime.to_regrole(claims ->> 'role')) stored,
    created_at timestamp not null default timezone('utc', now()),

    unique (subscription_id, entity, filters)
);

where realtime.user_defined_filter is

create type realtime.user_defined_filter as (
    column_name text,
    op realtime.equality_op,
    value text
);

and realtime.equality_ops are a subset of postgrest ops. Specifically:

create type realtime.equality_op as enum(
    'eq', 'neq', 'lt', 'lte', 'gt', 'gte'
);

For example, to subscribe to a table named public.notes where the id is 6 as the authenticated role:

insert into realtime.subscription(subscription_id, entity, filters, claims)
values ('832bd278-dac7-4bef-96be-e21c8a0023c4', 'public.notes', array[('id', 'eq', '6')], '{"role", "authenticated"}');

Reading WAL

This package exposes 1 public SQL function realtime.apply_rls(jsonb). It processes the output of a wal2json decoded logical replication slot and returns:

  • wal: (jsonb) The WAL record as JSONB in the form
  • is_rls_enabled: (bool) If the entity (table) the WAL record represents has row level security enabled
  • subscription_ids: (uuid[]) An array subscription ids that should be notified about the WAL record
  • errors: (text[]) An array of errors

The jsonb WAL record is in the following format for inserts.

{
    "type": "INSERT",
    "schema": "public",
    "table": "todos",
    "columns": [
        {
            "name": "id",
            "type": "int8",
        },
        {
            "name": "details",
            "type": "text",
        },
        {
            "name": "user_id",
            "type": "int8",
        }
    ],
    "commit_timestamp": "2021-09-29T17:35:38Z",
    "record": {
        "id": 1,
        "user_id": 1,
        "details": "mow the lawn"
    }
}

updates:

{
    "type": "UPDATE",
    "schema": "public",
    "table": "todos",
    "columns": [
        {
            "name": "id",
            "type": "int8",
        },
        {
            "name": "details",
            "type": "text",
        },
        {
            "name": "user_id",
            "type": "int8",
        }
    ],
    "commit_timestamp": "2021-09-29T17:35:38Z",
    "record": {
        "id": 2,
        "user_id": 1,
        "details": "mow the lawn"
    },
    "old_record": {
        "id": 1,
    }
}

deletes:

{
    "type": "DELETE",
    "schema": "public",
    "table": "todos",
    "columns": [
        {
            "name": "id",
            "type": "int8",
        },
        {
            "name": "details",
            "type": "text",
        },
        {
            "name": "user_id",
            "type": "int8",
        }
    ],
    "old_record": {
        "id": 1
    }
}

Important Notes:

  • Row level security is not applied to delete statements
  • The key/value pairs displayed in the old_record field include the table's identity columns for the record being updated/deleted. To display all values in old_record set the replica identity for the table to full
  • When a delete occurs, the contents of old_record will be broadcast to all subscribers to that table so ensure that each table's replica identity only contains information that is safe to expose publicly

Error States

Error 400: Bad Request, no primary key

If a WAL record for a table that does not have a primary key is passed through realtime.apply_rls, an error is returned

Ex:

(
    {
        "type": ...,
        "schema": ...,
        "table": ...
    },                               -- wal
    true,                            -- is_rls_enabled
    [...],                           -- subscription_ids,
    array['Error 400: Bad Request, no primary key'] -- errors
)::realtime.wal_rls;

Error 401: Unauthorized

If a WAL record is passed through realtime.apply_rls and the subscription's clams_role does not have permission to select the primary key columns in that table, an Unauthorized error is returned with no WAL data.

Ex:

(
    {
        "type": ...,
        "schema": ...,
        "table": ...
    },                               -- wal
    true,                            -- is_rls_enabled
    [...],                           -- subscription_ids,
    array['Error 401: Unauthorized'] -- errors
)::realtime.wal_rls;

Error 413: Payload Too Large

When the size of the wal2json record exceeds max_record_bytes the record and old_record keys are set as empty objects {} and the errors output array will contain the string "Error 413: Payload Too Large"

Ex:

(
    {..., "record": {}, "old_record": {}}, -- wal
    true,                                  -- is_rls_enabled
    [...],                                 -- subscription_ids,
    array['Error 413: Payload Too Large']  -- errors
)::realtime.wal_rls;

How it Works

Each WAL record is passed into realtime.apply_rls(jsonb) which:

  • impersonates each subscribed user by setting the appropriate role and request.jwt.claims that RLS policies depend on
  • queries for the row using its primary key values
  • applies the subscription's filters to check if the WAL record is filtered out
  • filters out all columns that are not visible to the user's role

Usage

Given a wal2json replication slot with the name realtime

select * from pg_create_logical_replication_slot('realtime', 'wal2json')

A complete list of config options can be found here:

The stream can be polled with

select
    xyz.wal,
    xyz.is_rls_enabled,
    xyz.subscription_ids,
    xyz.errors
from
    pg_logical_slot_get_changes(
        'realtime', null, null,
        'include-pk', '1',
        'include-transaction', 'false',
        'include-timestamp', 'true',
        'write-in-chunks', 'true',
        'format-version', '2',
        'actions', 'insert,update,delete',
        'filter-tables', 'realtime.*'
    ),
    lateral (
        select
            x.wal,
            x.is_rls_enabled,
            x.subscription_ids,
            x.errors
        from
            realtime.apply_rls(data::jsonb) x(wal, is_rls_enabled, subcription_ids, errors)
    ) xyz
where
    xyz.subscription_ids[1] is not null

Or, if the stream should be filtered according to a publication:

with pub as (
    select
        concat_ws(
            ',',
            case when bool_or(pubinsert) then 'insert' else null end,
            case when bool_or(pubupdate) then 'update' else null end,
            case when bool_or(pubdelete) then 'delete' else null end
        ) as w2j_actions,
        coalesce(
            string_agg(
                realtime.quote_wal2json(format('%I.%I', schemaname, tablename)::regclass),
                ','
            ) filter (where ppt.tablename is not null),
            ''
        ) w2j_add_tables
    from
        pg_publication pp
        left join pg_publication_tables ppt
            on pp.pubname = ppt.pubname
    where
        pp.pubname = 'supabase_realtime'
    group by
        pp.pubname
    limit 1
),
w2j as (
    select
        x.*, pub.w2j_add_tables
    from
         pub,
         pg_logical_slot_get_changes(
            'realtime', null, null,
            'include-pk', '1',
            'include-transaction', 'false',
            'include-timestamp', 'true',
            'write-in-chunks', 'true',
            'format-version', '2',
            'actions', pub.w2j_actions,
            'add-tables', pub.w2j_add_tables
        ) x
)
select
    xyz.wal,
    xyz.is_rls_enabled,
    xyz.subscription_ids,
    xyz.errors
from
    w2j,
    realtime.apply_rls(
        wal := w2j.data::jsonb,
        max_record_bytes := 1048576
    ) xyz(wal, is_rls_enabled, subscription_ids, errors)
where
    w2j.w2j_add_tables <> ''
    and xyz.subscription_ids[1] is not null

Configuration

max_record_bytes

max_record_bytes (default 1MB): Controls the maximum size of a WAL record that will be emitted with complete record and old_record data. When the size of the wal2json record exceeds max_record_bytes the record and old_record keys are set as empty objects {} and the errors output array will contain the string "Error 413: Payload Too Large"

Ex:

realtime.apply_rls(wal := w2j.data::jsonb, max_record_bytes := 1024*1024) x(wal, is_rls_enabled, subscription_ids, errors)

Installation

The project is SQL only and can be installed by executing the contents of sql/walrus--0.1.sql in a database instance.

Tests

Requires

  • Python 3.6+
  • docker-compose
pip install -e .

pytest

RFC Process

To open an request for comment (RFC), open a github issue against this repo and select the RFC template.

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].