neos/event-sourcing

Letzte Aktivität 19 Feb 2024 44 28 83305
composer require neos/event-sourcing

Lean and opinionated way to integrate Event Sourcing and CQRS pattern in your Flow framework package

Event Sourcing and CQRS for Flow Framework

Library providing interfaces and implementations for event-sourced applications.

Getting started

Install this package via composer:

composer require neos/event-sourcing

Setting up an Event Store

Since there could be multiple Event Stores simultaneously in one application, this package no longer comes with a pre-configured "default" store. It is just a matter of a couple of lines of YAML to configure a custom store:

Configuration/Settings.yaml:

Neos:
  EventSourcing:
    EventStore:
      stores:
        'Some.Package:EventStore':
          storage: 'Neos\EventSourcing\EventStore\Storage\Doctrine\DoctrineEventStorage'

This registers an Event Store, identified as "Some.Package:EventStore"1, that uses the provided Doctrine storage adapter that persists events in a database table2.

To make use of the newly configured Event Store one more step is required in order to finish the setup (in this case to create the corresponding database table):

./flow eventstore:setup Some.Package:EventStore
ℹ️  Note...

By default, the Event Store persists events in the same database that is used for Flow persistence. But because that can be configured otherwise, this table is not generated via Doctrine migrations. If your application relies on the events table to exist, you can of course still add a Doctrine migration for it.

Obtaining the Event Store

To get hold of an instance of a specific Event Store the EventStoreFactory can be used:

use Neos\EventSourcing\EventStore\EventStoreFactory;
use Neos\Flow\Annotations as Flow;

class SomeClass {

    /**
     * @Flow\Inject
     * @var EventStoreFactory
     */
    protected $eventStoreFactory;

    function someMethod() {
        $eventStore = $this->eventStoreFactory->create('Some.Package:EventStore');
    }
}
ℹ️  Alternative ways...

Alternatively you can inject the Event Store directly:

use Neos\EventSourcing\EventStore\EventStore;
use Neos\Flow\Annotations as Flow;

class SomeClass {

    /**
     * @Flow\Inject
     * @var EventStore
     */
    protected $eventStore;

    function someMethod() {
        // $this->eventStore->...
    }
}

But in this case you have to specify which event store to be injected. This can be done easily using Flow's Object Framework:

Configuration/Objects.yaml:

Some\Package\SomeClass:
  properties:
    'eventStore':
      object:
        factoryObjectName: Neos\EventSourcing\EventStore\EventStoreFactory
        arguments:
          1:
            value: 'Some.Package:EventStore'

If you use Flow 6.2 or newer, you can make use of the virtual object configuration to simplify the process:

Configuration/Objects.yaml:

'Some.Package:EventStore':
  className: Neos\EventSourcing\EventStore\EventStore
  factoryObjectName: Neos\EventSourcing\EventStore\EventStoreFactory
  arguments:
    1:
      value: 'Some.Package:EventStore'
use Neos\EventSourcing\EventStore\EventStore;
use Neos\Flow\Annotations as Flow;

class SomeClass {

    /**
     * @Flow\Inject(name="Some.Package:EventStore")
     * @var EventStore
     */
    protected $eventStore;
}

And, finally, if you happen to use the event store from many classes, you could of course create a custom Event Store facade like:

Classes/CustomEventStore.php

<?php
namespace Some\Package;

use Neos\EventSourcing\Event\DomainEvents;
use Neos\EventSourcing\EventStore\EventStore;
use Neos\EventSourcing\EventStore\EventStoreFactory;
use Neos\EventSourcing\EventStore\EventStream;
use Neos\EventSourcing\EventStore\ExpectedVersion;
use Neos\EventSourcing\EventStore\StreamName;
use Neos\Flow\Annotations as Flow;

/**
 * @Flow\Scope("singleton")
 */
final class CustomEventStore
{

    /**
     * @var EventStore
     */
    private $instance;

    public function __construct(EventStoreFactory $eventStoreFactory)
    {
        $this->instance = $eventStoreFactory->create('Some.Package:EventStore');
    }

    public function load(StreamName $streamName, int $minimumSequenceNumber = 0): EventStream
    {
        return $this->instance->load($streamName, $minimumSequenceNumber);
    }

    public function commit(StreamName $streamName, DomainEvents $events, int $expectedVersion = ExpectedVersion::ANY): void
    {
        $this->instance->commit($streamName, $events, $expectedVersion);
    }
}

and inject that.

Writing events

Example event: SomethingHasHappened.php
<?php
namespace Some\Package;

use Neos\EventSourcing\Event\DomainEventInterface;

final class SomethingHasHappened implements DomainEventInterface
{
    /**
     * @var string
     */
    private $message;

    public function __construct(string $message)
    {
        $this->message = $message;
    }

    public function getMessage(): string
    {
        return $this->message;
    }

}
<?php
$event = new SomethingHasHappened('some message');
$streamName = StreamName::fromString('some-stream');
$eventStore->commit($streamName, DomainEvents::withSingleEvent($event));

Reading events

<?php
$streamName = StreamName::fromString('some-stream');
$eventStream = $eventStore->load($streamName);
foreach ($eventStream as $eventEnvelope) {
    // the event as it's stored in the Event Store, including its global sequence number and the serialized payload
    $rawEvent = $eventEnvelope->getRawEvent();

    // the deserialized DomainEventInterface instance 
    $domainEvent = $eventEnvelope->getDomainEvent();
}

Reacting to events

In order to react upon new events you'll need an Event Listener:

<?php
namespace Some\Package;

use Neos\EventSourcing\EventListener\EventListenerInterface;
use Some\Package\SomethingHasHappened;

class SomeEventListener implements EventListenerInterface
{

    public function whenSomethingHasHappened(SomethingHasHappened $event): void
    {
        // do something with the $event
    }

}

The when*() methods of classes implementing the EventListenerInterface will be invoked whenever a corresponding event is committed to the Event Store.

Since it is possible to have multiple Event Stores the listener has to be registered with the corresponding Store:

Configuration/Settings.yaml:

Neos:
  EventSourcing:
    EventStore:
      stores:
        'Some.Package:EventStore':
          # ...
          listeners:
            'Some\Package\SomeEventListener': true

This registers the Some\Package\SomeEventListener so that it is updated whenever a corresponding event was committed to the "Some.Package:EventStore".

To register all/multiple listeners with an Event Store, you can use regular expressions, too:

Configuration/Settings.yaml:

Neos:
  EventSourcing:
    EventStore:
      stores:
        'Some.Package:EventStore':
          # ...
          listeners:
            'Some\Package\.*': true

Keep in mind though that a listener can only ever by registered with a single Event Store (otherwise you'll get an exception at "compile time").

Event Sourced Aggregate

The neos/event-sourcing package comes with a base class that can be used to implement Event-Sourced Aggregates.

Aggregate Construction

The AbstractEventSourcedAggregateRoot class has a private constructor. To create a fresh aggregate instance you should define a named constructor:

<?php
declare(strict_types=1);
namespace Some\Package;

use Neos\EventSourcing\AbstractEventSourcedAggregateRoot;

final class SomeAggregate extends AbstractEventSourcedAggregateRoot
{
    /**
     * @var SomeAggregateId
     */
    private $id;

    public static function create(SomeAggregateId $id): self
    {
        $instance = new static();
        // This method will only be invoked once. Upon reconstitution only the when*() methods are called.
        // So we must never change the instance state directly (i.e. $instance->id = $id) but use events:
        $instance->recordThat(new SomeAggregateWasCreated($id));
        return $instance;
    }

    public function whenSomeAggregateWasCreated(SomeAggregateWasCreated $event): void
    {
        $this->id = $event->getId();
    }
}

Aggregate Repository

This Framework does not provide an abstract Repository class for Aggregates, because an implementation is just a couple of lines of code and there is no useful abstraction that can be extracted. The Repository is just a slim wrapper around the EventStore and the Aggregate class:

final class ProductRepository
{
    /**
     * @var EventStore
     */
    private $eventStore;

    public function __construct(EventStore $eventStore)
    {
        $this->eventStore = $eventStore;
    }

    public function load(SomeAggregateId $id): SomeAggregate
    {
        $streamName = $this->getStreamName($id);
        return SomeAggregate::reconstituteFromEventStream($this->eventStore->load($streamName));
    }

    public function save(SomeAggregate $aggregate): void
    {
        $streamName = $this->getStreamName($aggregate->id());
        $this->eventStore->commit($streamName, $aggregate->pullUncommittedEvents(), $aggregate->getReconstitutionVersion());
    }

    private function getStreamName(SomeAggregateId $id): StreamName
    {
        // we assume that the aggregate stream name is "some-aggregate-<aggregate-id>"
        return StreamName::fromString('some-aggregate-' . $id);
    }

}

Tutorial

See Tutorial.md

Glossary

See Glossary.md


1: The Event Store identifier is arbitrary, but it's good practice prefixing it with a package key in order to prevent naming clashes 2: The Doctrine Event storage uses the same database connection that is configured for Flow and persists events in a table neos_eventsourcing_eventstore_events by default – this can be adjusted, see Settings.yaml

The content of the readme is provided by Github
The same vendor provides 93 package(s).