Skip to content

implement Event Sourcing pattern#211

Open
hatung wants to merge 1 commit intoSairyss:masterfrom
hatung:master
Open

implement Event Sourcing pattern#211
hatung wants to merge 1 commit intoSairyss:masterfrom
hatung:master

Conversation

@hatung
Copy link
Copy Markdown

@hatung hatung commented Apr 8, 2026


Overview

This guide shows how to wire up the event-store repository for a new aggregate. The event-sourcing layer lives entirely in src/libs/event-sourcing/ — no external event-sourcing package is installed. The pattern mirrors the existing UserRepository / SqlRepositoryBase convention.


1. Transaction child entity

// src/modules/wallet/domain/entities/transaction.entity.ts
import { Entity, AggregateID } from '@libs/ddd';

export type TransactionType = 'credit' | 'debit';

export interface TransactionProps {
  readonly type: TransactionType;
  readonly amount: number;
  readonly balanceAfter: number;
  readonly occurredOn: Date;
}

export class Transaction extends Entity<TransactionProps> {
  // No mutating methods — immutable once created

  get type(): TransactionType { return this.props.type; }
  get amount(): number { return this.props.amount; }
  get balanceAfter(): number { return this.props.balanceAfter; }
  get occurredOn(): Date { return this.props.occurredOn; }

  validate(): void {
    // Invariants enforced at aggregate level before entity creation
  }
}

2. New domain events (wallet module)

// src/modules/wallet/domain/events/money-deposited.domain-event.ts
import { DomainEvent, DomainEventProps } from '@libs/ddd';
import { Event } from '@libs/event-sourcing';

@Event('money-deposited')
export class MoneyDepositedDomainEvent extends DomainEvent {
  readonly transactionId: string;
  readonly amount: number;
  readonly balanceAfter: number;
  constructor(props: DomainEventProps<MoneyDepositedDomainEvent>) { super(props); }
}

// src/modules/wallet/domain/events/money-withdrawn.domain-event.ts
@Event('money-withdrawn')
export class MoneyWithdrawnDomainEvent extends DomainEvent {
  readonly transactionId: string;
  readonly amount: number;
  readonly balanceAfter: number;
  constructor(props: DomainEventProps<MoneyWithdrawnDomainEvent>) { super(props); }
}

Add @Event('wallet-created') to the existing WalletCreatedDomainEvent.


3. WalletAggregate

// src/modules/wallet/domain/wallet.aggregate.ts
import { AggregateID, AggregateRoot } from '@libs/ddd';
import { Err, Ok, Result } from 'oxide.ts';
import { Aggregate, EventHandler } from '@libs/event-sourcing';
import { randomUUID } from 'crypto';
import { Transaction } from './entities/transaction.entity';
import { WalletCreatedDomainEvent } from './events/wallet-created.domain-event';
import { MoneyDepositedDomainEvent } from './events/money-deposited.domain-event';
import { MoneyWithdrawnDomainEvent } from './events/money-withdrawn.domain-event';
import { WalletNotEnoughBalanceError } from './wallet.errors';

export interface WalletProps {
  userId: string;
  balance: number;
  transactions: Transaction[];
}

@Aggregate('wallet')
export class WalletAggregate extends AggregateRoot<WalletProps> {
  protected _id: AggregateID = '';

  static create(userId: string): WalletAggregate {
    const id = randomUUID();
    const wallet = new WalletAggregate();
    wallet.applyEvent(new WalletCreatedDomainEvent({ aggregateId: id, userId }));
    return wallet;
  }

  deposit(amount: number): void {
    const transactionId = randomUUID();
    const balanceAfter = this.props.balance + amount;
    this.applyEvent(new MoneyDepositedDomainEvent({
      aggregateId: this._id,
      transactionId,
      amount,
      balanceAfter,
    }));
  }

  withdraw(amount: number): Result<null, WalletNotEnoughBalanceError> {
    if (this.props.balance - amount < 0) {
      return Err(new WalletNotEnoughBalanceError());
    }
    const transactionId = randomUUID();
    const balanceAfter = this.props.balance - amount;
    this.applyEvent(new MoneyWithdrawnDomainEvent({
      aggregateId: this._id,
      transactionId,
      amount,
      balanceAfter,
    }));
    return Ok(null);
  }

  getTransactions(): ReadonlyArray<Transaction> {
    return this.props.transactions;
  }

  @EventHandler(WalletCreatedDomainEvent)
  private onWalletCreated(event: WalletCreatedDomainEvent): void {
    this._id = event.aggregateId;
    (this as any).props = { userId: event.userId, balance: 0, transactions: [] };
  }

  @EventHandler(MoneyDepositedDomainEvent)
  private onMoneyDeposited(event: MoneyDepositedDomainEvent): void {
    const tx = new Transaction({
      id: event.transactionId,
      props: {
        type: 'credit',
        amount: event.amount,
        balanceAfter: event.balanceAfter,
        occurredOn: new Date(event.metadata.timestamp),
      },
    });
    this.props.transactions.push(tx);
    this.props.balance = event.balanceAfter;
  }

  @EventHandler(MoneyWithdrawnDomainEvent)
  private onMoneyWithdrawn(event: MoneyWithdrawnDomainEvent): void {
    const tx = new Transaction({
      id: event.transactionId,
      props: {
        type: 'debit',
        amount: event.amount,
        balanceAfter: event.balanceAfter,
        occurredOn: new Date(event.metadata.timestamp),
      },
    });
    this.props.transactions.push(tx);
    this.props.balance = event.balanceAfter;
  }

  validate(): void {}
}

4. WalletEventStoreRepository

// src/modules/wallet/database/wallet.repository.ts
import { Injectable, Logger, Inject } from '@nestjs/common';
import { EventStoreRepositoryBase, EVENT_STORE, SNAPSHOT_STORE, EVENT_BUS } from '@libs/event-sourcing';
import { EventMap } from '@libs/event-sourcing/services/event-map';
import { WalletAggregate } from '../domain/wallet.aggregate';
import { Transaction } from '../domain/entities/transaction.entity';

@Injectable()
export class WalletEventStoreRepository extends EventStoreRepositoryBase<WalletAggregate> {
  protected readonly poolName = 'wallets';

  constructor(
    @Inject(EVENT_STORE) eventStore: any,
    @Inject(SNAPSHOT_STORE) snapshotStore: any,
    @Inject(EVENT_BUS) eventBus: any,
    @Inject(Logger) logger: any,
    eventMap: EventMap,
  ) {
    super(eventStore, snapshotStore, WalletAggregate, eventBus, logger, eventMap);
  }

  protected toSnapshotPayload(aggregate: WalletAggregate) {
    const props = aggregate.getProps();
    return {
      userId: props.userId,
      balance: props.balance,
      version: aggregate.version,
      transactions: props.transactions.map(tx => ({
        id: tx.id,
        type: tx.type,
        amount: tx.amount,
        balanceAfter: tx.balanceAfter,
        occurredOn: tx.occurredOn.toISOString(),
      })),
    };
  }

  protected fromSnapshot(snapshot: any): WalletAggregate {
    const aggregate = new WalletAggregate();
    (aggregate as any)._id = snapshot.aggregateId;
    const transactions = (snapshot.payload.transactions ?? []).map((t: any) =>
      new Transaction({
        id: t.id,
        props: { type: t.type, amount: t.amount, balanceAfter: t.balanceAfter, occurredOn: new Date(t.occurredOn) },
      }),
    );
    (aggregate as any).props = {
      userId: snapshot.payload.userId,
      balance: snapshot.payload.balance,
      transactions,
    };
    aggregate.version = snapshot.aggregateVersion;
    return aggregate;
  }
}

5. WalletProjectionHandler

// src/modules/wallet/projections/wallet.projection.ts
import { Injectable, OnModuleInit, OnModuleDestroy, Inject, Logger } from '@nestjs/common';
import { InjectPool } from 'nestjs-slonik';
import { DatabasePool, sql } from 'slonik';
import { Subscription } from 'rxjs';
import { EventBus, EVENT_BUS } from '@libs/event-sourcing';

@Injectable()
export class WalletProjectionHandler implements OnModuleInit, OnModuleDestroy {
  private subscription: Subscription;

  constructor(
    @Inject(EVENT_BUS) private readonly eventBus: EventBus,
    @InjectPool() private readonly pool: DatabasePool,
    private readonly logger: Logger,
  ) {}

  onModuleInit(): void {
    this.subscription = this.eventBus.subscribe({
      next: (envelope) => this.handle(envelope).catch(err =>
        this.logger.error(`WalletProjection error: ${err?.message}`, err),
      ),
    });
  }

  onModuleDestroy(): void { this.subscription?.unsubscribe(); }

  private async handle(envelope: any): Promise<void> {
    const { event, payload, metadata } = envelope;

    if (event === 'wallet-created') {
      await this.pool.query(sql`
        INSERT INTO wallets (id, "userId", balance, "createdAt", "updatedAt")
        VALUES (${metadata.aggregateId}, ${payload.userId}, 0,
                ${metadata.occurredOn.toISOString()}::timestamptz,
                ${metadata.occurredOn.toISOString()}::timestamptz)
        ON CONFLICT (id) DO NOTHING
      `);
    } else if (event === 'money-deposited' || event === 'money-withdrawn') {
      const type = event === 'money-deposited' ? 'credit' : 'debit';

      await this.pool.query(sql`
        UPDATE wallets SET balance = ${payload.balanceAfter}, "updatedAt" = NOW()
        WHERE id = ${metadata.aggregateId}
      `);

      await this.pool.query(sql`
        INSERT INTO transaction_reads
          (id, event_id, wallet_id, type, amount, balance_after, occurred_on)
        VALUES (
          ${payload.transactionId},
          ${metadata.eventId},
          ${metadata.aggregateId},
          ${type},
          ${payload.amount},
          ${payload.balanceAfter},
          ${metadata.occurredOn.toISOString()}::timestamptz
        )
        ON CONFLICT (event_id) DO NOTHING
      `);
    }
  }
}

6. Updated DepositMoneyService

// Changes only — remove @InjectPool(), remove pool.query(UPDATE wallets...), remove txRepo
async execute(command: DepositMoneyCommand): Promise<DepositMoneyResult> {
  const wallet = await this.walletRepo.findById(command.walletId);
  if (!wallet) return Err(new WalletNotFoundError({ walletId: command.walletId }));

  wallet.deposit(command.amount);                    // emits MoneyDepositedDomainEvent
  await this.walletRepo.save(wallet);                // appends event + publishes via EventBus

  return Ok({ walletId: command.walletId, balance: wallet.getProps().balance });
}

Same pattern for WithdrawMoneyService using wallet.withdraw(amount).


7. WalletModule wiring

@Module({
  imports: [
    EventSourcingModule.forFeature({
      events: [WalletCreatedDomainEvent, MoneyDepositedDomainEvent, MoneyWithdrawnDomainEvent],
    }),
  ],
  controllers: [DepositMoneyHttpController, WithdrawMoneyHttpController, GetWalletTransactionsHttpController],
  providers: [
    Logger,
    CreateWalletWhenUserIsCreatedDomainEventHandler,
    WalletMapper,
    WalletProjectionHandler,
    WalletTransactionReadMapper,
    { provide: WALLET_REPOSITORY, useClass: WalletEventStoreRepository },
    { provide: WALLET_READ_REPOSITORY, useClass: WalletTransactionReadRepository },
    DepositMoneyService,
    WithdrawMoneyService,
    GetWalletTransactionsQueryHandler,
  ],
  exports: [WALLET_REPOSITORY],
})
export class WalletModule {}

Remove TransactionModule from AppModule imports.


8. Tests to write

# Unit: aggregate @EventHandler methods
tests/unit/modules/wallet/wallet.aggregate.spec.ts

# Unit: Transaction entity construction
tests/unit/modules/wallet/transaction.entity.spec.ts

# Unit: projection handler
tests/unit/modules/wallet/wallet.projection.spec.ts

# E2e: existing tests must pass
npm run test:e2e

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant