All Projects → johnbywater → Eventsourcing

johnbywater / Eventsourcing

Licence: bsd-3-clause
A library for event sourcing in Python.

Programming Languages

python
139335 projects - #7 most used programming language
python3
1442 projects

Projects that are alternatives of or similar to Eventsourcing

Eventflow.example
DDD+CQRS+Event-sourcing examples using EventFlow following CQRS-ES architecture. It is configured with RabbitMQ, MongoDB(Snapshot store), PostgreSQL(Read store), EventStore(GES). It's targeted to .Net Core 2.2 and include docker compose file.
Stars: ✭ 131 (-82.76%)
Mutual labels:  event-sourcing, eventsourcing, ddd, cqrs, domain-driven-design
Revo
Event Sourcing, CQRS and DDD framework for C#/.NET Core.
Stars: ✭ 162 (-78.68%)
Mutual labels:  event-sourcing, eventsourcing, ddd, cqrs, domain-driven-design
Goes
Go Event Sourcing made easy
Stars: ✭ 144 (-81.05%)
Mutual labels:  event-sourcing, eventsourcing, ddd, cqrs, domain-driven-design
Bifrost
This is the stable release of Dolittle till its out of alpha->beta stages
Stars: ✭ 111 (-85.39%)
Mutual labels:  event-sourcing, eventsourcing, ddd, cqrs, domain-driven-design
eventuous
Minimalistic Event Sourcing library for .NET
Stars: ✭ 236 (-68.95%)
Mutual labels:  cqrs, ddd, domain-driven-design, event-sourcing, eventsourcing
delta
DDD-centric event-sourcing library for the JVM
Stars: ✭ 15 (-98.03%)
Mutual labels:  cqrs, ddd, domain-driven-design, event-sourcing, eventsourcing
Eventhorizon
CQRS/ES toolkit for Go
Stars: ✭ 961 (+26.45%)
Mutual labels:  event-sourcing, eventsourcing, ddd, cqrs, domain-driven-design
Productcontext Eventsourcing
A practical/experimental Event Sourcing application on Product Bounded Context in an e-commerce
Stars: ✭ 88 (-88.42%)
Mutual labels:  event-sourcing, ddd, cqrs, domain-driven-design
Modular Monolith With Ddd
Full Modular Monolith application with Domain-Driven Design approach.
Stars: ✭ 6,210 (+717.11%)
Mutual labels:  event-sourcing, ddd, cqrs, domain-driven-design
Event Sourcing Jambo
An Hexagonal Architecture with DDD + Aggregates + Event Sourcing using .NET Core, Kafka e MongoDB (Blog Engine)
Stars: ✭ 159 (-79.08%)
Mutual labels:  event-sourcing, ddd, cqrs, domain-driven-design
Kreta
Modern project management solution
Stars: ✭ 177 (-76.71%)
Mutual labels:  event-sourcing, ddd, cqrs, domain-driven-design
Akkatecture
a cqrs and event sourcing framework for dotnet core using akka.net
Stars: ✭ 414 (-45.53%)
Mutual labels:  event-sourcing, ddd, cqrs, domain-driven-design
Messagebus
A MessageBus (CommandBus, EventBus and QueryBus) implementation in PHP7
Stars: ✭ 178 (-76.58%)
Mutual labels:  event-sourcing, ddd, cqrs, domain-driven-design
Event Sourcing Castanha
An Event Sourcing service template with DDD, TDD and SOLID. It has High Cohesion and Loose Coupling, it's a good start for your next Microservice application.
Stars: ✭ 68 (-91.05%)
Mutual labels:  event-sourcing, ddd, cqrs, domain-driven-design
EcommerceDDD
Experimental full-stack application using Domain-Driven Design, CQRS, and Event Sourcing.
Stars: ✭ 178 (-76.58%)
Mutual labels:  cqrs, ddd, domain-driven-design, event-sourcing
e-shop
Sample Spring Cloud microservices e-shop.
Stars: ✭ 48 (-93.68%)
Mutual labels:  cqrs, ddd, domain-driven-design, event-sourcing
Eventflow
Async/await first CQRS+ES and DDD framework for .NET
Stars: ✭ 1,932 (+154.21%)
Mutual labels:  eventsourcing, ddd, cqrs, domain-driven-design
Rails event store
A Ruby implementation of an Event Store based on Active Record
Stars: ✭ 947 (+24.61%)
Mutual labels:  event-sourcing, ddd, cqrs, domain-driven-design
Dotnet New Caju
Learn Clean Architecture with .NET Core 3.0 🔥
Stars: ✭ 228 (-70%)
Mutual labels:  event-sourcing, ddd, cqrs, domain-driven-design
micro
Functional prooph for microservices
Stars: ✭ 53 (-93.03%)
Mutual labels:  cqrs, ddd, event-sourcing, eventsourcing

Build Status Coverage Status Documentation Status Latest Release Latest Release

Event Sourcing in Python

A library for event sourcing in Python.

Please refer to the documentation for installation and usage guides.

Synopsis

You can use the library's Aggregate class to define event sourced aggregates.

The example below uses the library's declarative syntax to define an event sourced aggregate called World. The World class uses the Aggregate class and the event decorator from the eventsourcing.domain module.

The World class has a command method make_it_so() which triggers an event called SomethingHappened that appends the given value of what to the history attribute of an instance of World.

It is generally recommended to use an imperative style when naming command methods, and to name event classes using past participles.

from eventsourcing.domain import Aggregate, event

class World(Aggregate):
    def __init__(self):
        self.history = []

    @event("SomethingHappened")
    def make_it_so(self, what):
        self.history.append(what)

When the World aggregate class is called, a created event object is created and used to construct an instance of World. And when the command method make_it_so() is called on an instance of World, a SomethingHappened event is triggered with a value of what, which results in the what value being appended to the history of the World aggregate instance.

# Create a new aggregate.
world = World()

# Execute aggregate commands.
world.make_it_so('dinosaurs')
world.make_it_so('trucks')
world.make_it_so('internet')

# View aggregate state.
assert world.history[0] == 'dinosaurs'
assert world.history[1] == 'trucks'
assert world.history[2] == 'internet'

The resulting events can be collected using the collect_events() method. The collect_events() method is used by the Application save() method (see below).

# Collect events.
events = world.collect_events()
assert len(events) == 4
assert type(events[0]).__name__ == 'Created'
assert type(events[1]).__name__ == 'SomethingHappened'
assert type(events[2]).__name__ == 'SomethingHappened'
assert type(events[3]).__name__ == 'SomethingHappened'

The aggregate events can be used to reconstruct the state of the aggregate.

copy = None
for event in events:
    copy = event.mutate(copy)

assert copy.history == world.history
assert copy.id == world.id
assert copy.version == world.version
assert copy.created_on == world.created_on
assert copy.modified_on == world.modified_on

This example can be adjusted and extended for any event sourced domain model.

Installation

Use pip to install the stable distribution from the Python Package Index.

$ pip install eventsourcing

Features

Domain models and applications — base classes for domain model aggregates and applications. Suggests how to structure an event-sourced application.

Flexible event store — flexible persistence of domain events. Combines an event mapper and an event recorder in ways that can be easily extended. Mapper uses a transcoder that can be easily extended to support custom model object types. Recorders supporting different databases can be easily substituted and configured with environment variables.

Application-level encryption and compression — encrypts and decrypts events inside the application. This means data will be encrypted in transit across a network ("on the wire") and at disk level including backups ("at rest"), which is a legal requirement in some jurisdictions when dealing with personally identifiable information (PII) for example the EU's GDPR. Compression reduces the size of stored domain events and snapshots, usually by around 25% to 50% of the original size. Compression reduces the size of data in the database and decreases transit time across a network.

Snapshotting — reduces access-time for aggregates with many domain events.

Versioning - allows domain model changes to be introduced after an application has been deployed. Both domain events and aggregate classes can be versioned. The recorded state of an older version can be upcast to be compatible with a new version. Stored events and snapshots are upcast from older versions to new versions before the event or aggregate object is reconstructed.

Optimistic concurrency control — ensures a distributed or horizontally scaled application doesn't become inconsistent due to concurrent method execution. Leverages optimistic concurrency controls in adapted database management systems.

Notifications and projections — reliable propagation of application events with pull-based notifications allows the application state to be projected accurately into replicas, indexes, view models, and other applications. Supports materialized views and CQRS.

Event-driven systems — reliable event processing. Event-driven systems can be defined independently of particular persistence infrastructure and mode of running.

Detailed documentation — documentation provides general overview, introduction of concepts, explanation of usage, and detailed descriptions of library classes.

Worked examples — includes examples showing how to develop aggregates, applications and systems.

Domain model

The examples below explain how the declarative syntax works using the underlying methods and mechanisms provided by the library's Aggregate base class for event sourced aggregates. Please read the documentation for more information.

The World aggregate below has a command method make_it_so() which has an argument what. The make_it_so() method triggers a domain event SomethingHappened with the value of the what arg. The event class SomethingHappened is interpreted as a Python dataclass, so that the annotation what: str is used to define the class __init__() method. The SomethingHappened has an apply() method that has a World aggregate argument. The apply() method body appends the value of the event's what attribute to the self.history of the World aggregate. The method trigger_event() is defined on the library's Aggregate base class, and constructs the given event class and applies the event to the aggregate.

from eventsourcing.domain import Aggregate, AggregateEvent

class World(Aggregate):
    def __init__(self):
        self.history = []

    def make_it_so(self, what: str):
        self.trigger_event(self.SomethingHappened, what=what)

    class SomethingHappened(AggregateEvent):
        what: str

        def apply(self, aggregate: "World"):
            aggregate.history.append(self.what)


world = World()
world.make_it_so('dinosaurs')
assert world.history[0] == 'dinosaurs'
events = world.collect_events()
assert len(events) == 2
assert type(events[0]).__name__ == 'Created'
assert type(events[1]).__name__ == 'SomethingHappened'

An alternative way of expressing the same thing is to define a second method on the aggregate which is decorated with the library's @event decorator. This decorated method will do the work of triggering and applying an event. The command method can then call the decorated method rather than triggering an event directly using the trigger_event() method. In this case, the event class is defined without an apply() method, the decorator mentions the event class, and the decorated method arguments must match the event attributes. When the decorated method is called, the decorator triggers the event, and the body of the decorated method is used to apply the event attributes to the aggregate.

from eventsourcing.domain import Aggregate, AggregateEvent, event

class World(Aggregate):
    def __init__(self):
        self.history = []

    def make_it_so(self, what: str):
        self.something_happened(what)

    class SomethingHappened(AggregateEvent):
        what: str

    @event(SomethingHappened)
    def something_happened(self, what: str):
        self.history.append(what)


world = World()
world.make_it_so('dinosaurs')
assert world.history[0] == 'dinosaurs'
events = world.collect_events()
assert len(events) == 2
assert type(events[0]).__name__ == 'Created'
assert type(events[1]).__name__ == 'SomethingHappened'

It is sometimes useful to have the event class defined explicitly, but whenever there is no need to refer to the event class in other places, the event class can be automatically generated from the method signature.

A simpler way of expressing the same thing as above is simply to define an event name in the @event decorator using a string. In this case, the decorator will automatically define an event class using the decorated method arguments and the given event name.

from eventsourcing.domain import Aggregate, event

class World(Aggregate):
    def __init__(self):
        self.history = []

    def make_it_so(self, what: str):
        self.something_happened(what)

    @event("SomethingHappened")
    def something_happened(self, what: str):
        self.history.append(what)


world = World()
world.make_it_so('dinosaurs')
assert world.history[0] == 'dinosaurs'
events = world.collect_events()
assert len(events) == 2
assert type(events[0]).__name__ == 'Created'
assert type(events[1]).__name__ == 'SomethingHappened'

An alternative way of expressing the same thing is to simply use the @event decorator without mentioning an event class or an event class name. In this case, an event class name will be constructed from the decorated method name. The event class name is constructed by splitting the method name by its underscores, capitalising each part, and then joining the parts to make the class name SomethingHappened.

from eventsourcing.domain import Aggregate, event

class World(Aggregate):
    def __init__(self):
        self.history = []

    def make_it_so(self, what: str):
        self.something_happened(what)

    @event
    def something_happened(self, what: str):
        self.history.append(what)


world = World()
world.make_it_so('dinosaurs')
assert world.history[0] == 'dinosaurs'
events = world.collect_events()
assert len(events) == 2
assert type(events[0]).__name__ == 'Created'
assert type(events[1]).__name__ == 'SomethingHappened'

For trivial commands that would simply trigger an event wilh the given arguments, an alternative way of expressing the same thing as above is to decorate the command method with the @event decorator. In this case, when the command method make_it_so() is called an event will be triggered, and the command method body will be used to apply the event to the aggregate. Because command methods should be named with imperatives, and events should be named with part participles, it is recommended to define the name of the event in the decorator. Commands that need to do some work on the given arguments before triggering an event will need to use one of the styles above. Of course, in some cases it may be more natural to use a past participle as the method name.

from eventsourcing.domain import Aggregate, event

class World(Aggregate):
    def __init__(self):
        self.history = []

    @event("SomethingHappened")
    def make_it_so(self, what):
        self.history.append(what)


world = World()
world.make_it_so('dinosaurs')
assert world.history[0] == 'dinosaurs'
events = world.collect_events()
assert len(events) == 2
assert type(events[0]).__name__ == 'Created'
assert type(events[1]).__name__ == 'SomethingHappened'

Application

You can use the library's Application class to define event sourced applications. The Application class brings together a domain model and persistence infrastructure.

The Worlds application class in the example below uses the library's Application class and the World aggregate defined above.

The create_world() method creates and saves a new World aggregate instance, and returns the UUID of the new instance. The make_it_so() method uses the given world_id to retrieve a World aggregate instance from the application's repository (reconstructs aggregate stored events), then calls the aggregate's make_it_so() method, and then saves the aggregate (collects and stores new aggregate event). The get_history() method uses the given world_id to retrieve an World aggregate from the application's repository and then returns the aggregate's history.

from eventsourcing.application import Application

class Worlds(Application):

    def create_world(self):
        world = World()
        self.save(world)
        return world.id

    def make_it_so(self, world_id, what):
        world = self.repository.get(world_id)
        world.make_it_so(what)
        self.save(world)

    def get_history(self, world_id):
        world = self.repository.get(world_id)
        return world.history

Let's define a test that uses the Worlds` application. Below, we will run this test with different persistence infrastructure.

from eventsourcing.persistence import RecordConflictError
from eventsourcing.system import NotificationLogReader


def test(app: Worlds, expect_visible_in_db: bool):

    # Check app has zero event notifications.
    assert len(app.log['1,10'].items) == 0, len(app.log['1,10'].items)

    # Create a new aggregate.
    world_id = app.create_world()

    # Execute application commands.
    app.make_it_so(world_id, 'dinosaurs')
    app.make_it_so(world_id, 'trucks')

    # Check recorded state of the aggregate.
    assert app.get_history(world_id) == [
        'dinosaurs',
        'trucks'
    ]

    # Execute another command.
    app.make_it_so(world_id, 'internet')

    # Check recorded state of the aggregate.
    assert app.get_history(world_id) == [
        'dinosaurs',
        'trucks',
        'internet'
    ]

    # Check values are (or aren't visibible) in the database.
    values = [b'dinosaurs', b'trucks', b'internet']
    if expect_visible_in_db:
        expected_num_visible = len(values)
    else:
        expected_num_visible = 0

    actual_num_visible = 0
    reader = NotificationLogReader(app.log)
    for notification in reader.read(start=1):
        for what in values:
            if what in notification.state:
                actual_num_visible += 1
                break
    assert expected_num_visible == actual_num_visible

    # Get historical state (at version from above).
    old = app.repository.get(world_id, version=3)
    assert old.history[-1] == 'trucks' # internet not happened
    assert len(old.history) == 2

    # Check app has four event notifications.
    assert len(app.log['1,10'].items) == 4, len(app.log['1,10'].items)

    # Optimistic concurrency control (no branches).
    old.make_it_so('future')
    try:
        app.save(old)
    except RecordConflictError:
        pass
    else:
        raise Exception("Shouldn't get here")

    # Check app still has only four event notifications.
    assert len(app.log['1,10'].items) == 4, len(app.log['1,10'].items)

    # Read event notifications.
    reader = NotificationLogReader(app.log)
    notifications = list(reader.read(start=1))
    assert len(notifications) == 4

    # Create eight more aggregate events.
    world_id = app.create_world()
    app.make_it_so(world_id, 'plants')
    app.make_it_so(world_id, 'fish')
    app.make_it_so(world_id, 'mammals')

    world_id = app.create_world()
    app.make_it_so(world_id, 'morning')
    app.make_it_so(world_id, 'afternoon')
    app.make_it_so(world_id, 'evening')

    # Get the new event notifications from the reader.
    last_id = notifications[-1].id
    notifications = list(reader.read(start=last_id + 1))
    assert len(notifications) == 8

    # Get all the event notifications from the reader.
    last_id = notifications[-1].id
    notifications = list(reader.read(start=1))
    assert len(notifications) == 12

Development environment

We can run the code in default "development" environment using the default "Plain Old Python Object" infrastructure. The example below runs with no compression or encryption of the stored events.

# Construct an application object.
app = Worlds()

# Run the test.
test(app, expect_visible_in_db=True)

SQLite environment

Configure "production" environment using SQLite infrastructure. The example below uses zlib and AES to compress and encrypt the stored events. An in-memory SQLite database is used, but it would work the same way if a file path were set as the SQLITE_DBNAME.

import os

from eventsourcing.cipher import AESCipher

# Generate a cipher key (keep this safe).
cipher_key = AESCipher.create_key(num_bytes=32)

# Cipher key.
os.environ['CIPHER_KEY'] = cipher_key
# Cipher topic.
os.environ['CIPHER_TOPIC'] = "eventsourcing.cipher:AESCipher"
# Compressor topic.
os.environ['COMPRESSOR_TOPIC'] = "eventsourcing.compressor:ZlibCompressor"

# Use SQLite infrastructure.
os.environ['INFRASTRUCTURE_FACTORY'] = 'eventsourcing.sqlite:Factory'
os.environ['SQLITE_DBNAME'] = ':memory:'  # Or path to a file on disk.

Run the code in "production" SQLite environment.

# Construct an application object.
app = Worlds()

# Run the test.
test(app, expect_visible_in_db=False)

PostgreSQL environment

Configure "production" environment using PostgresSQL infrastructure. The example below uses zlib and AES to compress and encrypt the stored events. It is assumed the database and database user have already been created, and the database server is running locally.

import os

from eventsourcing.cipher import AESCipher

# Generate a cipher key (keep this safe).
cipher_key = AESCipher.create_key(num_bytes=32)

# Cipher key.
os.environ['CIPHER_KEY'] = cipher_key
# Cipher topic.
os.environ['CIPHER_TOPIC'] = "eventsourcing.cipher:AESCipher"
# Compressor topic.
os.environ['COMPRESSOR_TOPIC'] = "eventsourcing.compressor:ZlibCompressor"

# Use Postgres infrastructure.
os.environ['INFRASTRUCTURE_FACTORY'] = (
    'eventsourcing.postgres:Factory'
)
os.environ["POSTGRES_DBNAME"] = "eventsourcing"
os.environ["POSTGRES_HOST"] = "127.0.0.1"
os.environ["POSTGRES_USER"] = "eventsourcing"
os.environ["POSTGRES_PASSWORD"] = "eventsourcing"

Run the code in "production" PostgreSQL environment.

# Construct an application object.
app = Worlds()

# Run the test.
test(app, expect_visible_in_db=False)

Project

This project is hosted on GitHub.

Please register questions, requests and issues on GitHub, or post in the project's Slack channel.

There is a Slack channel for this project, which you are welcome to join.

Please refer to the documentation for installation and usage guides.

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].