diff --git a/.dependency-cruiser.js b/.dependency-cruiser.js index 9a268ba6..627026e4 100644 --- a/.dependency-cruiser.js +++ b/.dependency-cruiser.js @@ -87,6 +87,39 @@ module.exports = { }, }, + { + name: 'no-direct-event-store-driver-import', + comment: + 'Event store drivers live in src/infrastructure/event-sourcing/drivers/. ' + + 'Only src/infrastructure/event-sourcing/ itself and tests/ may import from there. ' + + 'Feature modules and application code must use the IEventStore / ISnapshotStore ports.', + severity: 'error', + from: { + pathNot: [ + 'src/infrastructure/event-sourcing/', + 'src/app\\.module\\.ts$', + 'tests/', + ], + }, + to: { + path: 'src/infrastructure/event-sourcing/drivers/', + }, + }, + { + name: 'no-libs-event-sourcing-imports-infrastructure', + comment: + 'src/libs/event-sourcing/ is a pure domain/application library. ' + + 'It must NOT import from src/infrastructure/. ' + + 'Adapters are injected at runtime via NestJS DI — never imported directly.', + severity: 'error', + from: { + path: 'src/libs/event-sourcing/', + }, + to: { + path: 'src/infrastructure/', + }, + }, + /* rules from the 'recommended' preset: */ // { // name: 'no-circular', diff --git a/.env.example b/.env.example index d54c866a..8d27515f 100644 --- a/.env.example +++ b/.env.example @@ -3,3 +3,8 @@ DB_PORT=5432 DB_USERNAME='user' DB_PASSWORD='password' DB_NAME='ddh' + +# Event Store — separate connection pool for the event-sourcing layer. +# Can point to the same PostgreSQL instance as the main DB using a different schema or DB name. +# Set NODE_ENV=test to use the in-memory driver automatically (no DB needed). +EVENT_STORE_DATABASE_URL='postgres://user:password@localhost:5432/ddh' diff --git a/.env.test b/.env.test index 553b8ec3..39786812 100644 --- a/.env.test +++ b/.env.test @@ -1,6 +1,6 @@ -# env file for e2e testing DB_HOST='localhost' DB_PORT=5432 DB_USERNAME='user' DB_PASSWORD='password' -DB_NAME='ddh_tests' # running tests in a separate test db +DB_NAME='ddh_test' +EVENT_STORE_DATABASE_URL='postgres://user:password@localhost:5432/ddh_test' diff --git a/jest-e2e.json b/jest-e2e.json index 264756fe..9293e75b 100644 --- a/jest-e2e.json +++ b/jest-e2e.json @@ -11,6 +11,7 @@ "@modules/(.*)$": "/src/modules/$1", "@config/(.*)$": "/src/configs/$1", "@libs/(.*)$": "/src/libs/$1", + "@infrastructure/(.*)$": "/src/infrastructure/$1", "@exceptions$": "/src/libs/exceptions", "@tests/(.*)$": "/tests/$1" }, diff --git a/jest.integration.config.js b/jest.integration.config.js new file mode 100644 index 00000000..ede40a59 --- /dev/null +++ b/jest.integration.config.js @@ -0,0 +1,19 @@ +/** @type {import('ts-jest').JestConfigWithTsJest} */ +module.exports = { + moduleFileExtensions: ['js', 'json', 'ts'], + rootDir: '.', + testMatch: ['/tests/integration/**/*.spec.ts'], + transform: { + '^.+\\.(t|j)s$': ['ts-jest', { tsconfig: 'tsconfig.json' }], + }, + testEnvironment: 'node', + testTimeout: 30000, + moduleNameMapper: { + '^@src/(.*)$': '/src/$1', + '^@libs/(.*)$': '/src/libs/$1', + '^@modules/(.*)$': '/src/modules/$1', + '^@config/(.*)$': '/src/configs/$1', + '^@infrastructure/(.*)$': '/src/infrastructure/$1', + '^@tests/(.*)$': '/tests/$1', + }, +}; diff --git a/jest.unit.config.js b/jest.unit.config.js new file mode 100644 index 00000000..c5dc8e91 --- /dev/null +++ b/jest.unit.config.js @@ -0,0 +1,20 @@ +/** @type {import('ts-jest').JestConfigWithTsJest} */ +module.exports = { + moduleFileExtensions: ['js', 'json', 'ts'], + rootDir: '.', + testMatch: ['/tests/unit/**/*.spec.ts'], + transform: { + '^.+\\.(t|j)s$': ['ts-jest', { tsconfig: 'tsconfig.json' }], + }, + collectCoverageFrom: ['src/**/*.(t|j)s'], + coverageDirectory: './coverage', + testEnvironment: 'node', + moduleNameMapper: { + '^@src/(.*)$': '/src/$1', + '^@libs/(.*)$': '/src/libs/$1', + '^@modules/(.*)$': '/src/modules/$1', + '^@config/(.*)$': '/src/configs/$1', + '^@infrastructure/(.*)$': '/src/infrastructure/$1', + '^@tests/(.*)$': '/tests/$1', + }, +}; diff --git a/migrations/transaction_reads.sql b/migrations/transaction_reads.sql new file mode 100644 index 00000000..9ba3e16c --- /dev/null +++ b/migrations/transaction_reads.sql @@ -0,0 +1,19 @@ +-- Migration: transaction_reads +-- Feature: transaction-event-sourcing-projection +-- Creates the read-model table for wallet transaction history. +-- event_store and snapshot_store tables are created by the event-sourcing infrastructure (feature 001). + +CREATE TABLE IF NOT EXISTS transaction_reads ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + event_id UUID NOT NULL, + wallet_id UUID NOT NULL, + type VARCHAR(10) NOT NULL CHECK (type IN ('credit', 'debit')), + amount NUMERIC(18, 2) NOT NULL CHECK (amount > 0), + balance_after NUMERIC(18, 2) NOT NULL, + occurred_on TIMESTAMPTZ NOT NULL, + CONSTRAINT uq_transaction_reads_event_id UNIQUE (event_id) +); + +-- Primary query pattern: per-wallet history newest-first +CREATE INDEX IF NOT EXISTS idx_transaction_reads_wallet_id_occurred_on + ON transaction_reads (wallet_id, occurred_on DESC); diff --git a/src/app.module.ts b/src/app.module.ts index 5ba9805e..a8eac2b8 100644 --- a/src/app.module.ts +++ b/src/app.module.ts @@ -11,6 +11,13 @@ import { ExceptionInterceptor } from '@libs/application/interceptors/exception.i import { postgresConnectionUri } from './configs/database.config'; import { GraphQLModule } from '@nestjs/graphql'; import { ApolloDriver, ApolloDriverConfig } from '@nestjs/apollo'; +import { + EventSourcingModule, + EVENT_STORE, + SNAPSHOT_STORE, +} from '@libs/event-sourcing'; +import { PostgresEventStore } from '@infrastructure/event-sourcing/drivers/postgres/postgres.event-store'; +import { PostgresSnapshotStore } from '@infrastructure/event-sourcing/drivers/postgres/postgres.snapshot-store'; const interceptors = [ { @@ -36,6 +43,14 @@ const interceptors = [ autoSchemaFile: true, }), + EventSourcingModule.forRoot({ + eventStore: { provide: EVENT_STORE, useClass: PostgresEventStore }, + snapshotStore: { + provide: SNAPSHOT_STORE, + useClass: PostgresSnapshotStore, + }, + }), + // Modules UserModule, WalletModule, diff --git a/src/configs/app.routes.ts b/src/configs/app.routes.ts index 6843a1f0..1db50eeb 100644 --- a/src/configs/app.routes.ts +++ b/src/configs/app.routes.ts @@ -20,4 +20,9 @@ export const routesV1 = { root: walletsRoot, delete: `/${walletsRoot}/:id`, }, + transaction: { + deposit: `${walletsRoot}/:walletId/deposit`, + withdraw: `${walletsRoot}/:walletId/withdraw`, + getTransactions: `${walletsRoot}/:walletId/transactions`, + }, }; diff --git a/src/infrastructure/event-sourcing/drivers/in-memory/in-memory.event-store.ts b/src/infrastructure/event-sourcing/drivers/in-memory/in-memory.event-store.ts new file mode 100644 index 00000000..677d8eaf --- /dev/null +++ b/src/infrastructure/event-sourcing/drivers/in-memory/in-memory.event-store.ts @@ -0,0 +1,92 @@ +import { DomainEvent } from '@libs/ddd/domain-event.base'; +import { + EventEnvelopeFilter, + EventStreamFilter, + IEventStore, +} from '@libs/event-sourcing/interfaces/event-store.interface'; +import { EventEnvelope } from '@libs/event-sourcing/models/event-envelope'; +import { EventStream } from '@libs/event-sourcing/models/event-stream'; +import { EventStoreVersionConflictException } from '@libs/event-sourcing/exceptions/event-store-version-conflict.exception'; +import { EventMap } from '@libs/event-sourcing/services/event-map'; + +export class InMemoryEventStore implements IEventStore { + /** streamId → ordered list of envelopes (version is 1-based index) */ + private readonly streams = new Map(); + + constructor(private readonly eventMap?: EventMap) {} + + async connect(): Promise {} + async disconnect(): Promise {} + async ensureCollection(_pool?: string): Promise {} + + async appendEvents( + stream: EventStream, + expectedVersion: number, + events: DomainEvent[], + _pool?: string, + ): Promise { + if (events.length === 0) return []; + + const stored = this.streams.get(stream.streamId) ?? []; + const currentVersion = stored.length; + + if (currentVersion !== expectedVersion) { + throw new EventStoreVersionConflictException( + stream.streamId, + expectedVersion, + currentVersion, + ); + } + + const newEnvelopes: EventEnvelope[] = events.map((event, i) => + EventEnvelope.fromDomainEvent(event, currentVersion + i + 1), + ); + + this.streams.set(stream.streamId, [...stored, ...newEnvelopes]); + return newEnvelopes; + } + + async *getEvents( + stream: EventStream, + filter?: EventStreamFilter, + ): AsyncGenerator { + const stored = this.streams.get(stream.streamId) ?? []; + const fromVersion = filter?.fromVersion ?? 1; + + for (const envelope of stored) { + if (envelope.metadata.version >= fromVersion) { + yield this.deserializeEnvelope(envelope); + } + } + } + + async getAllEnvelopes(filter: EventEnvelopeFilter): Promise { + const result: EventEnvelope[] = []; + + for (const envelopes of this.streams.values()) { + for (const envelope of envelopes) { + const { occurredOn } = envelope.metadata; + if (filter.from && occurredOn < filter.from) continue; + if (filter.to && occurredOn > filter.to) continue; + result.push(envelope); + } + } + + return result.sort( + (a, b) => + a.metadata.occurredOn.getTime() - b.metadata.occurredOn.getTime(), + ); + } + + /** Used in tests to directly inspect stored envelopes */ + getStoredEnvelopes(streamId: string): EventEnvelope[] { + return this.streams.get(streamId) ?? []; + } + + private deserializeEnvelope(envelope: EventEnvelope): DomainEvent { + if (this.eventMap) { + return this.eventMap.deserialize(envelope.event, envelope.payload); + } + return envelope.payload as unknown as DomainEvent; + } +} diff --git a/src/infrastructure/event-sourcing/drivers/in-memory/in-memory.snapshot-store.ts b/src/infrastructure/event-sourcing/drivers/in-memory/in-memory.snapshot-store.ts new file mode 100644 index 00000000..a62c3f7a --- /dev/null +++ b/src/infrastructure/event-sourcing/drivers/in-memory/in-memory.snapshot-store.ts @@ -0,0 +1,38 @@ +import { ISnapshotStore } from '@libs/event-sourcing/interfaces/snapshot-store.interface'; +import { EventStream } from '@libs/event-sourcing/models/event-stream'; +import { Snapshot } from '@libs/event-sourcing/models/snapshot'; + +export class InMemorySnapshotStore implements ISnapshotStore { + private readonly snapshots = new Map(); + + async connect(): Promise {} + async disconnect(): Promise {} + async ensureCollection(_pool?: string): Promise {} + + async appendSnapshot( + stream: EventStream, + version: number, + payload: Record, + _pool?: string, + ): Promise { + const existing = this.snapshots.get(stream.streamId) ?? []; + const snapshot = new Snapshot({ + aggregateId: stream.aggregateId, + aggregateVersion: version, + payload, + }); + this.snapshots.set(stream.streamId, [...existing, snapshot]); + } + + async getLastSnapshot( + stream: EventStream, + _pool?: string, + ): Promise { + const stored = this.snapshots.get(stream.streamId); + if (!stored || stored.length === 0) return null; + + return stored.reduce((latest, current) => + current.aggregateVersion > latest.aggregateVersion ? current : latest, + ); + } +} diff --git a/src/infrastructure/event-sourcing/drivers/postgres/postgres.event-store.ts b/src/infrastructure/event-sourcing/drivers/postgres/postgres.event-store.ts new file mode 100644 index 00000000..18af4290 --- /dev/null +++ b/src/infrastructure/event-sourcing/drivers/postgres/postgres.event-store.ts @@ -0,0 +1,142 @@ +import { Injectable } from '@nestjs/common'; +import { InjectPool } from 'nestjs-slonik'; +import { DatabasePool, sql } from 'slonik'; +import { DomainEvent } from '@libs/ddd/domain-event.base'; +import { + EventEnvelopeFilter, + EventStreamFilter, + IEventStore, +} from '@libs/event-sourcing/interfaces/event-store.interface'; +import { EventEnvelope } from '@libs/event-sourcing/models/event-envelope'; +import { EventStream } from '@libs/event-sourcing/models/event-stream'; +import { EventMap } from '@libs/event-sourcing/services/event-map'; +import { EventStoreVersionConflictException } from '@libs/event-sourcing/exceptions/event-store-version-conflict.exception'; + +@Injectable() +export class PostgresEventStore implements IEventStore { + constructor( + @InjectPool() private readonly pool: DatabasePool, + private readonly eventMap: EventMap, + ) {} + + // eslint-disable-next-line @typescript-eslint/no-empty-function + async connect(): Promise {} + + // eslint-disable-next-line @typescript-eslint/no-empty-function + async disconnect(): Promise {} + + // eslint-disable-next-line @typescript-eslint/no-unused-vars + async ensureCollection(_pool?: string): Promise { + // Schema applied via schema.sql migration + } + + async appendEvents( + stream: EventStream, + expectedVersion: number, + events: DomainEvent[], + pool = 'default', + ): Promise { + if (events.length === 0) return []; + + const envelopes: EventEnvelope[] = []; + let nextVersion = expectedVersion + 1; + + await this.pool.transaction(async (tx) => { + // Check current stream version inside the transaction + const versionRow = await tx.maybeOne(sql` + SELECT MAX(version) AS current_version + FROM event_store + WHERE stream_id = ${stream.streamId} + AND pool = ${pool} + `); + + const currentVersion = + (versionRow?.current_version as number | null) ?? 0; + if (currentVersion !== expectedVersion) { + throw new EventStoreVersionConflictException( + stream.streamId, + expectedVersion, + currentVersion, + ); + } + + for (const event of events) { + const envelope = EventEnvelope.fromDomainEvent(event, nextVersion); + envelopes.push(envelope); + + await tx.query(sql` + INSERT INTO event_store + (stream_id, aggregate_id, event, payload, version, occurred_on, + correlation_id, causation_id, pool) + VALUES ( + ${stream.streamId}, + ${envelope.metadata.aggregateId}, + ${envelope.event}, + ${JSON.stringify(envelope.payload)}, + ${nextVersion}, + ${envelope.metadata.occurredOn.toISOString()}::timestamptz, + ${envelope.metadata.correlationId ?? null}, + ${envelope.metadata.causationId ?? null}, + ${pool} + ) + `); + nextVersion += 1; + } + }); + + return envelopes; + } + + async *getEvents( + stream: EventStream, + filter?: EventStreamFilter, + ): AsyncGenerator { + const fromVersion = filter?.fromVersion ?? 1; + + const rows = await this.pool.any(sql` + SELECT event, payload, version, occurred_on, correlation_id, causation_id, aggregate_id + FROM event_store + WHERE stream_id = ${stream.streamId} + AND version >= ${fromVersion} + ORDER BY version ASC + `); + + for (const row of rows) { + yield this.eventMap.deserialize( + row.event as string, + row.payload as unknown as Record, + ); + } + } + + async getAllEnvelopes(filter: EventEnvelopeFilter): Promise { + const poolName = filter.pool ?? 'default'; + const from = filter.from ?? new Date(0); + const to = filter.to ?? new Date(); + + const rows = await this.pool.any(sql` + SELECT event, payload, id AS event_id, aggregate_id, version, + occurred_on, correlation_id, causation_id + FROM event_store + WHERE occurred_on >= ${from.toISOString()}::timestamptz + AND occurred_on <= ${to.toISOString()}::timestamptz + AND pool = ${poolName} + ORDER BY occurred_on ASC + `); + + return rows.map((row) => + EventEnvelope.restore({ + event: row.event as string, + payload: row.payload as unknown as Record, + metadata: { + eventId: row.event_id as string, + aggregateId: row.aggregate_id as string, + version: row.version as number, + occurredOn: new Date(row.occurred_on as string), + correlationId: row.correlation_id as string | undefined, + causationId: row.causation_id as string | undefined, + }, + }), + ); + } +} diff --git a/src/infrastructure/event-sourcing/drivers/postgres/postgres.snapshot-store.ts b/src/infrastructure/event-sourcing/drivers/postgres/postgres.snapshot-store.ts new file mode 100644 index 00000000..568383e9 --- /dev/null +++ b/src/infrastructure/event-sourcing/drivers/postgres/postgres.snapshot-store.ts @@ -0,0 +1,66 @@ +import { Injectable } from '@nestjs/common'; +import { InjectPool } from 'nestjs-slonik'; +import { DatabasePool, sql } from 'slonik'; +import { ISnapshotStore } from '@libs/event-sourcing/interfaces/snapshot-store.interface'; +import { EventStream } from '@libs/event-sourcing/models/event-stream'; +import { Snapshot } from '@libs/event-sourcing/models/snapshot'; + +@Injectable() +export class PostgresSnapshotStore implements ISnapshotStore { + constructor(@InjectPool() private readonly pool: DatabasePool) {} + + // eslint-disable-next-line @typescript-eslint/no-empty-function + async connect(): Promise {} + + // eslint-disable-next-line @typescript-eslint/no-empty-function + async disconnect(): Promise {} + + // eslint-disable-next-line @typescript-eslint/no-unused-vars + async ensureCollection(_pool?: string): Promise { + // Schema applied via schema.sql migration + } + + async appendSnapshot( + stream: EventStream, + version: number, + payload: Record, + pool = 'default', + ): Promise { + await this.pool.query(sql` + INSERT INTO snapshot_store + (stream_id, aggregate_id, aggregate_version, payload, occurred_on, pool) + VALUES ( + ${stream.streamId}, + ${stream.aggregateId}, + ${version}, + ${JSON.stringify(payload)}, + NOW(), + ${pool} + ) + `); + } + + async getLastSnapshot( + stream: EventStream, + pool = 'default', + ): Promise { + const result = await this.pool.maybeOne(sql` + SELECT id, aggregate_id, aggregate_version, payload, occurred_on + FROM snapshot_store + WHERE stream_id = ${stream.streamId} + AND pool = ${pool} + ORDER BY aggregate_version DESC + LIMIT 1 + `); + + if (!result) return null; + + return new Snapshot({ + snapshotId: result.id as string, + aggregateId: result.aggregate_id as string, + aggregateVersion: result.aggregate_version as number, + payload: result.payload as unknown as Record, + occurredOn: new Date(result.occurred_on as string), + }); + } +} diff --git a/src/infrastructure/event-sourcing/drivers/postgres/schema.sql b/src/infrastructure/event-sourcing/drivers/postgres/schema.sql new file mode 100644 index 00000000..5f234b3b --- /dev/null +++ b/src/infrastructure/event-sourcing/drivers/postgres/schema.sql @@ -0,0 +1,39 @@ +-- Event Store: append-only log of all domain events +CREATE TABLE IF NOT EXISTS event_store ( + id UUID NOT NULL DEFAULT gen_random_uuid(), + stream_id VARCHAR(500) NOT NULL, + aggregate_id VARCHAR(500) NOT NULL, + event VARCHAR(500) NOT NULL, + payload JSONB NOT NULL DEFAULT '{}', + version INTEGER NOT NULL, + occurred_on TIMESTAMPTZ NOT NULL DEFAULT NOW(), + correlation_id VARCHAR(500), + causation_id VARCHAR(500), + pool VARCHAR(100) NOT NULL DEFAULT 'default', + + PRIMARY KEY (stream_id, version), + CONSTRAINT event_store_version_positive CHECK (version > 0) +); + +CREATE INDEX IF NOT EXISTS idx_event_store_stream_id + ON event_store (stream_id, version); + +CREATE INDEX IF NOT EXISTS idx_event_store_aggregate_id + ON event_store (aggregate_id); + +CREATE INDEX IF NOT EXISTS idx_event_store_occurred_on + ON event_store (occurred_on); + +-- Snapshot Store: point-in-time aggregate state captures +CREATE TABLE IF NOT EXISTS snapshot_store ( + id UUID NOT NULL DEFAULT gen_random_uuid() PRIMARY KEY, + stream_id VARCHAR(500) NOT NULL, + aggregate_id VARCHAR(500) NOT NULL, + aggregate_version INTEGER NOT NULL, + payload JSONB NOT NULL DEFAULT '{}', + occurred_on TIMESTAMPTZ NOT NULL DEFAULT NOW(), + pool VARCHAR(100) NOT NULL DEFAULT 'default' +); + +CREATE INDEX IF NOT EXISTS idx_snapshot_store_stream_version + ON snapshot_store (stream_id, aggregate_version DESC); diff --git a/src/infrastructure/event-sourcing/index.ts b/src/infrastructure/event-sourcing/index.ts new file mode 100644 index 00000000..8005f8d1 --- /dev/null +++ b/src/infrastructure/event-sourcing/index.ts @@ -0,0 +1,7 @@ +// ─── In-Memory Adapters (tests / development) ──────────────────────────────── +export { InMemoryEventStore } from './drivers/in-memory/in-memory.event-store'; +export { InMemorySnapshotStore } from './drivers/in-memory/in-memory.snapshot-store'; + +// ─── PostgreSQL Adapters (production) ──────────────────────────────────────── +export { PostgresEventStore } from './drivers/postgres/postgres.event-store'; +export { PostgresSnapshotStore } from './drivers/postgres/postgres.snapshot-store'; diff --git a/src/libs/ddd/aggregate-root.base.ts b/src/libs/ddd/aggregate-root.base.ts index 02485851..8fb95502 100644 --- a/src/libs/ddd/aggregate-root.base.ts +++ b/src/libs/ddd/aggregate-root.base.ts @@ -1,12 +1,37 @@ import { DomainEvent } from './domain-event.base'; -import { Entity } from './entity.base'; +import { Entity, CreateEntityProps, AggregateID } from './entity.base'; import { EventEmitter2 } from '@nestjs/event-emitter'; import { LoggerPort } from '@libs/ports/logger.port'; import { RequestContextService } from '../application/context/AppRequestContext'; -export abstract class AggregateRoot extends Entity { +const EVENT_HANDLER_METADATA = 'event-sourcing:event-handler'; + +/** + * AggregateRoot base class. + * + * Supports two construction modes: + * + * 1. **Traditional** (existing aggregates — UserEntity, WalletEntity): + * Call `super({ id, props })` — the full Entity lifecycle applies. + * + * 2. **Event-sourced** (new aggregates using applyEvent/loadFromHistory): + * Call `super()` with no args — the aggregate starts empty and its + * state is built by replaying domain events via `loadFromHistory()`. + * Concrete event-sourced aggregates must declare `protected _id = ''` + * and `validate() {}` as a no-op. + */ +export abstract class AggregateRoot< + EntityProps = any, +> extends Entity { private _domainEvents: DomainEvent[] = []; + /** Current event-sourcing version — incremented by applyEvent() */ + version = 0; + + constructor(props?: CreateEntityProps) { + super(props ?? ({ __eventSourcing: true } as any)); + } + get domainEvents(): DomainEvent[] { return this._domainEvents; } @@ -37,4 +62,71 @@ export abstract class AggregateRoot extends Entity { ); this.clearEvents(); } + + /** + * Apply a domain event to this aggregate. + * + * - Dispatches to the @EventHandler-decorated method matching the event class. + * - When fromHistory=false (default), adds the event to the uncommitted list. + * - Always increments version. + */ + public applyEvent(event: DomainEvent, fromHistory = false): void { + this.dispatchToHandler(event); + if (!fromHistory) { + this.addEvent(event); + } + this.version += 1; + } + + /** + * Returns all uncommitted events and clears the internal list. + * Called by EventStoreRepositoryBase.save() before appending to the store. + */ + public commit(): DomainEvent[] { + const events = [...this._domainEvents]; + this.clearEvents(); + return events; + } + + /** + * Replay historical events to reconstruct aggregate state. + * Calls applyEvent(event, true) for each event — version increments, + * events are NOT added to the uncommitted list. + */ + public async loadFromHistory( + cursor: AsyncIterable, + ): Promise { + for await (const event of cursor) { + this.applyEvent(event, true); + } + } + + private dispatchToHandler(event: DomainEvent): void { + const proto = Object.getPrototypeOf(this); + const methodNames = this.getAllMethodNames(proto); + + for (const methodName of methodNames) { + const registeredClass = Reflect.getMetadata( + EVENT_HANDLER_METADATA, + proto, + methodName, + ); + if (registeredClass && event instanceof registeredClass) { + (this as any)[methodName](event); + return; + } + } + } + + private getAllMethodNames(proto: any): string[] { + const names: string[] = []; + let current = proto; + while (current && current !== Object.prototype) { + names.push(...Object.getOwnPropertyNames(current)); + current = Object.getPrototypeOf(current); + } + return [...new Set(names)]; + } } + +export { AggregateID }; diff --git a/src/libs/ddd/entity.base.ts b/src/libs/ddd/entity.base.ts index b946f43e..a2a8081e 100644 --- a/src/libs/ddd/entity.base.ts +++ b/src/libs/ddd/entity.base.ts @@ -21,13 +21,21 @@ export interface CreateEntityProps { updatedAt?: Date; } +/** Sentinel passed by AggregateRoot when creating an event-sourced aggregate with no props */ +export const EVENT_SOURCING_SENTINEL = '__eventSourcing'; + export abstract class Entity { - constructor({ - id, - createdAt, - updatedAt, - props, - }: CreateEntityProps) { + constructor( + propsOrSentinel: CreateEntityProps | { __eventSourcing: true }, + ) { + if (EVENT_SOURCING_SENTINEL in propsOrSentinel) { + // Event-sourcing mode: state is built by replaying events. + // Skip Entity initialization — subclass sets _id and props via event handlers. + return; + } + + const { id, createdAt, updatedAt, props } = + propsOrSentinel as CreateEntityProps; this.setId(id); this.validateProps(props); const now = new Date(); diff --git a/src/libs/event-sourcing/decorators/aggregate.decorator.ts b/src/libs/event-sourcing/decorators/aggregate.decorator.ts new file mode 100644 index 00000000..36715ad8 --- /dev/null +++ b/src/libs/event-sourcing/decorators/aggregate.decorator.ts @@ -0,0 +1,7 @@ +export const AGGREGATE_NAME_METADATA = 'aggregate:name'; + +export function Aggregate(name: string): ClassDecorator { + return (target) => { + Reflect.defineMetadata(AGGREGATE_NAME_METADATA, name, target); + }; +} diff --git a/src/libs/event-sourcing/decorators/event-handler.decorator.ts b/src/libs/event-sourcing/decorators/event-handler.decorator.ts new file mode 100644 index 00000000..c47cf7ed --- /dev/null +++ b/src/libs/event-sourcing/decorators/event-handler.decorator.ts @@ -0,0 +1,18 @@ +import { DomainEvent } from '@libs/ddd/domain-event.base'; + +export const EVENT_HANDLER_METADATA = 'event-sourcing:event-handler'; + +export type Constructor = new (...args: any[]) => T; + +export function EventHandler( + eventClass: Constructor, +): MethodDecorator { + return (target, propertyKey) => { + Reflect.defineMetadata( + EVENT_HANDLER_METADATA, + eventClass, + target, + propertyKey, + ); + }; +} diff --git a/src/libs/event-sourcing/decorators/event.decorator.ts b/src/libs/event-sourcing/decorators/event.decorator.ts new file mode 100644 index 00000000..896579c6 --- /dev/null +++ b/src/libs/event-sourcing/decorators/event.decorator.ts @@ -0,0 +1,7 @@ +export const EVENT_NAME_METADATA = 'event:name'; + +export function Event(name: string): ClassDecorator { + return (target) => { + Reflect.defineMetadata(EVENT_NAME_METADATA, name, target); + }; +} diff --git a/src/libs/event-sourcing/event-sourcing.module.ts b/src/libs/event-sourcing/event-sourcing.module.ts new file mode 100644 index 00000000..8d514027 --- /dev/null +++ b/src/libs/event-sourcing/event-sourcing.module.ts @@ -0,0 +1,99 @@ +import { + DynamicModule, + Module, + OnModuleDestroy, + OnModuleInit, + Provider, + Inject, +} from '@nestjs/common'; +import { IEventStore } from './interfaces/event-store.interface'; +import { ISnapshotStore } from './interfaces/snapshot-store.interface'; +import { EventMap } from './services/event-map'; +import { EventBus } from './services/event-bus'; +import { DomainEvent } from '@libs/ddd/domain-event.base'; + +export const EVENT_STORE = Symbol('EVENT_STORE'); +export const SNAPSHOT_STORE = Symbol('SNAPSHOT_STORE'); +export const EVENT_BUS = Symbol('EVENT_BUS'); + +export interface EventSourcingOptions { + /** + * Provider for the IEventStore implementation. + * Must bind to the EVENT_STORE token. + * Example: { provide: EVENT_STORE, useClass: PostgresEventStore } + */ + eventStore: Provider; + /** + * Provider for the ISnapshotStore implementation. + * Must bind to the SNAPSHOT_STORE token. + * Example: { provide: SNAPSHOT_STORE, useClass: PostgresSnapshotStore } + */ + snapshotStore: Provider; +} + +@Module({}) +export class EventSourcingModule implements OnModuleInit, OnModuleDestroy { + constructor( + @Inject(EVENT_STORE) private readonly eventStore: IEventStore, + @Inject(SNAPSHOT_STORE) private readonly snapshotStore: ISnapshotStore, + ) {} + + async onModuleInit(): Promise { + await this.eventStore.connect(); + await this.snapshotStore.connect(); + await this.eventStore.ensureCollection(); + await this.snapshotStore.ensureCollection(); + } + + async onModuleDestroy(): Promise { + await this.eventStore.disconnect(); + await this.snapshotStore.disconnect(); + } + + /** + * Configure the event-sourcing module with concrete adapter implementations. + * The caller is responsible for providing EVENT_STORE and SNAPSHOT_STORE adapters + * from the infrastructure layer — this module has no knowledge of Slonik or + * in-memory implementations. + * + * @example + * EventSourcingModule.forRoot({ + * eventStore: { provide: EVENT_STORE, useClass: PostgresEventStore }, + * snapshotStore: { provide: SNAPSHOT_STORE, useClass: PostgresSnapshotStore }, + * }) + */ + static forRoot(options: EventSourcingOptions): DynamicModule { + return { + global: true, + module: EventSourcingModule, + providers: [ + EventMap, + EventBus, + { provide: EVENT_BUS, useExisting: EventBus }, + options.eventStore, + options.snapshotStore, + ], + exports: [EVENT_STORE, SNAPSHOT_STORE, EVENT_BUS, EventMap, EventBus], + }; + } + + static forFeature(options: { + events: Array DomainEvent>; + }): DynamicModule { + return { + module: EventSourcingModule, + providers: [ + { + provide: 'EVENT_SOURCING_FEATURE_EVENTS', + useFactory: (eventMap: EventMap) => { + for (const eventClass of options.events) { + eventMap.register(eventClass); + } + return options.events; + }, + inject: [EventMap], + }, + ], + }; + } +} diff --git a/src/libs/event-sourcing/exceptions/aggregate-not-found.exception.ts b/src/libs/event-sourcing/exceptions/aggregate-not-found.exception.ts new file mode 100644 index 00000000..caf25a57 --- /dev/null +++ b/src/libs/event-sourcing/exceptions/aggregate-not-found.exception.ts @@ -0,0 +1,11 @@ +import { ExceptionBase } from '@libs/exceptions/exception.base'; + +export class AggregateNotFoundException extends ExceptionBase { + readonly code = 'EVENT_STORE.AGGREGATE_NOT_FOUND'; + + constructor(aggregateId: string) { + super(`Aggregate with id "${aggregateId}" was not found`, undefined, { + aggregateId, + }); + } +} diff --git a/src/libs/event-sourcing/exceptions/event-store-version-conflict.exception.ts b/src/libs/event-sourcing/exceptions/event-store-version-conflict.exception.ts new file mode 100644 index 00000000..98b5378c --- /dev/null +++ b/src/libs/event-sourcing/exceptions/event-store-version-conflict.exception.ts @@ -0,0 +1,17 @@ +import { ExceptionBase } from '@libs/exceptions/exception.base'; + +export class EventStoreVersionConflictException extends ExceptionBase { + readonly code = 'EVENT_STORE.VERSION_CONFLICT'; + + constructor( + streamId: string, + expectedVersion: number, + actualVersion: number, + ) { + super( + `Version conflict on stream "${streamId}": expected ${expectedVersion}, got ${actualVersion}`, + undefined, + { streamId, expectedVersion, actualVersion }, + ); + } +} diff --git a/src/libs/event-sourcing/exceptions/unregistered-event.exception.ts b/src/libs/event-sourcing/exceptions/unregistered-event.exception.ts new file mode 100644 index 00000000..3a369277 --- /dev/null +++ b/src/libs/event-sourcing/exceptions/unregistered-event.exception.ts @@ -0,0 +1,14 @@ +import { ExceptionBase } from '@libs/exceptions/exception.base'; + +export class UnregisteredEventException extends ExceptionBase { + readonly code = 'EVENT_STORE.UNREGISTERED_EVENT'; + + constructor(eventName: string) { + super( + `Event "${eventName}" is not registered in the EventMap. ` + + `Register it via EventSourcingModule.forFeature({ events: [...] }).`, + undefined, + { eventName }, + ); + } +} diff --git a/src/libs/event-sourcing/index.ts b/src/libs/event-sourcing/index.ts new file mode 100644 index 00000000..681b6f59 --- /dev/null +++ b/src/libs/event-sourcing/index.ts @@ -0,0 +1,44 @@ +// ─── Decorators ────────────────────────────────────────────────────────────── +export { Aggregate } from './decorators/aggregate.decorator'; +export { Event } from './decorators/event.decorator'; +export { EventHandler } from './decorators/event-handler.decorator'; + +// ─── Models ────────────────────────────────────────────────────────────────── +export { EventEnvelope } from './models/event-envelope'; +export type { EventEnvelopeMetadata } from './models/event-envelope'; +export { EventStream } from './models/event-stream'; +export { Snapshot } from './models/snapshot'; + +// ─── Interfaces ────────────────────────────────────────────────────────────── +export type { IEventPayload } from './interfaces/event-payload.type'; +export type { IEventSerializer } from './interfaces/event-serializer.interface'; +export type { + IEventStore, + EventStreamFilter, + EventEnvelopeFilter, +} from './interfaces/event-store.interface'; +export type { ISnapshotStore } from './interfaces/snapshot-store.interface'; + +// ─── Services ──────────────────────────────────────────────────────────────── +export { EventMap } from './services/event-map'; +export { EventBus } from './services/event-bus'; + +// ─── Repository ────────────────────────────────────────────────────────────── +export { EventStoreRepositoryBase } from './repository/event-store-repository.base'; +export type { EventStoreRepositoryPort } from './repository/event-store-repository.port'; + +// ─── Exceptions ────────────────────────────────────────────────────────────── +export { EventStoreVersionConflictException } from './exceptions/event-store-version-conflict.exception'; +export { AggregateNotFoundException } from './exceptions/aggregate-not-found.exception'; +export { UnregisteredEventException } from './exceptions/unregistered-event.exception'; + +// ─── Module & DI tokens ────────────────────────────────────────────────────── +// NOTE: Concrete drivers (InMemoryEventStore, PostgresEventStore, etc.) are NOT +// exported from this pure library. Import them from @infrastructure/event-sourcing. +export { + EventSourcingModule, + EVENT_STORE, + SNAPSHOT_STORE, + EVENT_BUS, +} from './event-sourcing.module'; +export type { EventSourcingOptions } from './event-sourcing.module'; diff --git a/src/libs/event-sourcing/interfaces/event-payload.type.ts b/src/libs/event-sourcing/interfaces/event-payload.type.ts new file mode 100644 index 00000000..c50aa938 --- /dev/null +++ b/src/libs/event-sourcing/interfaces/event-payload.type.ts @@ -0,0 +1 @@ +export type IEventPayload = Record; diff --git a/src/libs/event-sourcing/interfaces/event-serializer.interface.ts b/src/libs/event-sourcing/interfaces/event-serializer.interface.ts new file mode 100644 index 00000000..be0f6f02 --- /dev/null +++ b/src/libs/event-sourcing/interfaces/event-serializer.interface.ts @@ -0,0 +1,7 @@ +import { DomainEvent } from '@libs/ddd/domain-event.base'; +import { IEventPayload } from './event-payload.type'; + +export interface IEventSerializer { + serialize(event: E): IEventPayload; + deserialize(payload: IEventPayload): E; +} diff --git a/src/libs/event-sourcing/interfaces/event-store.interface.ts b/src/libs/event-sourcing/interfaces/event-store.interface.ts new file mode 100644 index 00000000..33252629 --- /dev/null +++ b/src/libs/event-sourcing/interfaces/event-store.interface.ts @@ -0,0 +1,30 @@ +import { DomainEvent } from '@libs/ddd/domain-event.base'; +import { EventEnvelope } from '../models/event-envelope'; +import { EventStream } from '../models/event-stream'; + +export interface EventStreamFilter { + fromVersion?: number; +} + +export interface EventEnvelopeFilter { + from?: Date; + to?: Date; + pool?: string; +} + +export interface IEventStore { + connect(): Promise; + disconnect(): Promise; + ensureCollection(pool?: string): Promise; + appendEvents( + stream: EventStream, + expectedVersion: number, + events: DomainEvent[], + pool?: string, + ): Promise; + getEvents( + stream: EventStream, + filter?: EventStreamFilter, + ): AsyncGenerator; + getAllEnvelopes(filter: EventEnvelopeFilter): Promise; +} diff --git a/src/libs/event-sourcing/interfaces/snapshot-store.interface.ts b/src/libs/event-sourcing/interfaces/snapshot-store.interface.ts new file mode 100644 index 00000000..66d814e4 --- /dev/null +++ b/src/libs/event-sourcing/interfaces/snapshot-store.interface.ts @@ -0,0 +1,15 @@ +import { Snapshot } from '../models/snapshot'; +import { EventStream } from '../models/event-stream'; + +export interface ISnapshotStore { + connect(): Promise; + disconnect(): Promise; + ensureCollection(pool?: string): Promise; + appendSnapshot( + stream: EventStream, + version: number, + payload: Record, + pool?: string, + ): Promise; + getLastSnapshot(stream: EventStream, pool?: string): Promise; +} diff --git a/src/libs/event-sourcing/models/event-envelope.ts b/src/libs/event-sourcing/models/event-envelope.ts new file mode 100644 index 00000000..f6c1a7c7 --- /dev/null +++ b/src/libs/event-sourcing/models/event-envelope.ts @@ -0,0 +1,56 @@ +import { DomainEvent } from '@libs/ddd/domain-event.base'; +import { EVENT_NAME_METADATA } from '../decorators/event.decorator'; +import { IEventPayload } from '../interfaces/event-payload.type'; + +export interface EventEnvelopeMetadata { + readonly eventId: string; + readonly aggregateId: string; + readonly version: number; + readonly occurredOn: Date; + readonly correlationId?: string; + readonly causationId?: string; +} + +export class EventEnvelope { + readonly event: string; + readonly payload: IEventPayload; + readonly metadata: EventEnvelopeMetadata; + + private constructor( + event: string, + payload: IEventPayload, + metadata: EventEnvelopeMetadata, + ) { + this.event = event; + this.payload = payload; + this.metadata = metadata; + } + + static fromDomainEvent( + domainEvent: DomainEvent, + version: number, + ): EventEnvelope { + const eventName: string = + Reflect.getMetadata(EVENT_NAME_METADATA, domainEvent.constructor) ?? + domainEvent.constructor.name; + + const payload: IEventPayload = { ...domainEvent } as IEventPayload; + + return new EventEnvelope(eventName, payload, { + eventId: domainEvent.id, + aggregateId: domainEvent.aggregateId, + version, + occurredOn: new Date(domainEvent.metadata.timestamp), + correlationId: domainEvent.metadata.correlationId, + causationId: domainEvent.metadata.causationId, + }); + } + + static restore(props: { + event: string; + payload: IEventPayload; + metadata: EventEnvelopeMetadata; + }): EventEnvelope { + return new EventEnvelope(props.event, props.payload, props.metadata); + } +} diff --git a/src/libs/event-sourcing/models/event-stream.ts b/src/libs/event-sourcing/models/event-stream.ts new file mode 100644 index 00000000..23c262de --- /dev/null +++ b/src/libs/event-sourcing/models/event-stream.ts @@ -0,0 +1,15 @@ +export class EventStream { + readonly streamId: string; + readonly aggregateType: string; + readonly aggregateId: string; + + private constructor(aggregateType: string, aggregateId: string) { + this.aggregateType = aggregateType; + this.aggregateId = aggregateId; + this.streamId = `${aggregateType}-${aggregateId}`; + } + + static for(aggregateType: string, aggregateId: string): EventStream { + return new EventStream(aggregateType, aggregateId); + } +} diff --git a/src/libs/event-sourcing/models/snapshot.ts b/src/libs/event-sourcing/models/snapshot.ts new file mode 100644 index 00000000..c8e42685 --- /dev/null +++ b/src/libs/event-sourcing/models/snapshot.ts @@ -0,0 +1,23 @@ +import { randomUUID } from 'crypto'; + +export class Snapshot { + readonly snapshotId: string; + readonly aggregateId: string; + readonly aggregateVersion: number; + readonly payload: Record; + readonly occurredOn: Date; + + constructor(props: { + aggregateId: string; + aggregateVersion: number; + payload: Record; + occurredOn?: Date; + snapshotId?: string; + }) { + this.snapshotId = props.snapshotId ?? randomUUID(); + this.aggregateId = props.aggregateId; + this.aggregateVersion = props.aggregateVersion; + this.payload = props.payload; + this.occurredOn = props.occurredOn ?? new Date(); + } +} diff --git a/src/libs/event-sourcing/repository/event-store-repository.base.ts b/src/libs/event-sourcing/repository/event-store-repository.base.ts new file mode 100644 index 00000000..47e0752a --- /dev/null +++ b/src/libs/event-sourcing/repository/event-store-repository.base.ts @@ -0,0 +1,171 @@ +import { AggregateRoot } from '@libs/ddd/aggregate-root.base'; +import { LoggerPort } from '@libs/ports/logger.port'; +import { AGGREGATE_NAME_METADATA } from '../decorators/aggregate.decorator'; +import { IEventStore } from '../interfaces/event-store.interface'; +import { ISnapshotStore } from '../interfaces/snapshot-store.interface'; +import { EventEnvelope } from '../models/event-envelope'; +import { EventStream } from '../models/event-stream'; +import { Snapshot } from '../models/snapshot'; +import { EventBus } from '../services/event-bus'; +import { EventMap } from '../services/event-map'; +import { + EventStoreRepositoryPort, + EventEnvelopeFilter, +} from './event-store-repository.port'; + +type Constructor = new (...args: any[]) => A; + +export abstract class EventStoreRepositoryBase + implements EventStoreRepositoryPort +{ + /** Write a snapshot every N events (override per subclass, default 10) */ + protected readonly snapshotInterval: number = 10; + + /** Optional named event pool — override per subclass */ + protected readonly poolName?: string; + + constructor( + protected readonly eventStore: IEventStore, + protected readonly snapshotStore: ISnapshotStore, + private readonly aggregateConstructor: Constructor, + protected readonly eventBus: EventBus, + protected readonly logger: LoggerPort, + protected readonly eventMap: EventMap, + ) {} + + // ─── save() ──────────────────────────────────────────────────────────────── + + async save(aggregate: A): Promise { + const events = aggregate.commit(); + + // FR-009 — no-op guard + if (events.length === 0) return; + + const stream = this.streamFor(aggregate); + const expectedVersion = aggregate.version - events.length; + + // Append events (FR-001, FR-003) + const envelopes = await this.eventStore.appendEvents( + stream, + expectedVersion, + events, + this.poolName, + ); + + this.logger.debug( + `[EventStoreRepository] save — stream: ${stream.streamId}, events: ${events.length}, version: ${aggregate.version}`, + ); + + // FR-005 — snapshot on save at configured interval + if (aggregate.version % this.snapshotInterval === 0) { + await this.snapshotStore.appendSnapshot( + stream, + aggregate.version, + this.toSnapshotPayload(aggregate), + this.poolName, + ); + } + + // FR-004 — publish envelopes; swallow publish errors + try { + this.eventBus.publish(envelopes); + } catch (err: any) { + this.logger.error( + `[EventStoreRepository] EventBus.publish failed for stream ${stream.streamId}: ${err?.message}`, + err, + { streamId: stream.streamId }, + ); + } + } + + // ─── findById() ──────────────────────────────────────────────────────────── + + async findById(id: string): Promise { + const aggregateType = this.getAggregateName(); + const stream = EventStream.for(aggregateType, id); + + const snapshot = await this.snapshotStore.getLastSnapshot( + stream, + this.poolName, + ); + + let aggregate: A; + let fromVersion = 1; + let snapshotUsed = false; + + if (snapshot) { + aggregate = this.fromSnapshot(snapshot); + fromVersion = snapshot.aggregateVersion + 1; + snapshotUsed = true; + } else { + aggregate = new this.aggregateConstructor(); + } + + let eventsReplayed = 0; + const cursor = this.eventStore.getEvents(stream, { fromVersion }); + + const events: import('@libs/ddd/domain-event.base').DomainEvent[] = []; + for await (const event of cursor) { + events.push(event); + eventsReplayed += 1; + } + + if (!snapshot && eventsReplayed === 0) { + return null; + } + + await aggregate.loadFromHistory( + (async function* () { + for (const e of events) yield e; + })(), + ); + + this.logger.debug( + `[EventStoreRepository] findById — stream: ${stream.streamId}, snapshotUsed: ${snapshotUsed}, eventsReplayed: ${eventsReplayed}`, + ); + + return aggregate; + } + + // ─── findEnvelopes() ─────────────────────────────────────────────────────── + + async findEnvelopes(filter: EventEnvelopeFilter): Promise { + return this.eventStore.getAllEnvelopes(filter); + } + + // ─── Helpers ─────────────────────────────────────────────────────────────── + + private streamFor(aggregate: A): EventStream { + const aggregateType = this.getAggregateName(); + return EventStream.for(aggregateType, (aggregate as any).id ?? ''); + } + + private getAggregateName(): string { + return ( + Reflect.getMetadata(AGGREGATE_NAME_METADATA, this.aggregateConstructor) ?? + this.aggregateConstructor.name + ); + } + + /** + * Serialize aggregate state for snapshot storage. + * Override in concrete repository to persist custom fields. + */ + protected toSnapshotPayload(aggregate: A): Record { + return { + ...(aggregate as unknown as Record), + version: aggregate.version, + }; + } + + /** + * Restore an aggregate instance from a snapshot. + * Override in concrete repository to restore custom fields. + */ + protected fromSnapshot(snapshot: Snapshot): A { + const aggregate = new this.aggregateConstructor(); + Object.assign(aggregate, snapshot.payload); + aggregate.version = snapshot.aggregateVersion; + return aggregate; + } +} diff --git a/src/libs/event-sourcing/repository/event-store-repository.port.ts b/src/libs/event-sourcing/repository/event-store-repository.port.ts new file mode 100644 index 00000000..84da9f38 --- /dev/null +++ b/src/libs/event-sourcing/repository/event-store-repository.port.ts @@ -0,0 +1,12 @@ +import { AggregateRoot } from '@libs/ddd/aggregate-root.base'; +import { EventEnvelope } from '../models/event-envelope'; +import { EventEnvelopeFilter } from '../interfaces/event-store.interface'; + +export type { AggregateID } from '@libs/ddd/entity.base'; +export type { EventEnvelopeFilter }; + +export interface EventStoreRepositoryPort { + save(aggregate: A): Promise; + findById(id: string): Promise; + findEnvelopes(filter: EventEnvelopeFilter): Promise; +} diff --git a/src/libs/event-sourcing/services/event-bus.ts b/src/libs/event-sourcing/services/event-bus.ts new file mode 100644 index 00000000..ca56eaef --- /dev/null +++ b/src/libs/event-sourcing/services/event-bus.ts @@ -0,0 +1,18 @@ +import { Injectable } from '@nestjs/common'; +import { Subject, Subscription } from 'rxjs'; +import { EventEnvelope } from '../models/event-envelope'; + +@Injectable() +export class EventBus { + private readonly subject = new Subject(); + + publish(envelopes: EventEnvelope[]): void { + for (const envelope of envelopes) { + this.subject.next(envelope); + } + } + + subscribe(observer: (envelope: EventEnvelope) => void): Subscription { + return this.subject.subscribe(observer); + } +} diff --git a/src/libs/event-sourcing/services/event-map.ts b/src/libs/event-sourcing/services/event-map.ts new file mode 100644 index 00000000..a26310aa --- /dev/null +++ b/src/libs/event-sourcing/services/event-map.ts @@ -0,0 +1,65 @@ +import { Injectable } from '@nestjs/common'; +import { DomainEvent } from '@libs/ddd/domain-event.base'; +import { EVENT_NAME_METADATA } from '../decorators/event.decorator'; +import { IEventPayload } from '../interfaces/event-payload.type'; +import { IEventSerializer } from '../interfaces/event-serializer.interface'; +import { UnregisteredEventException } from '../exceptions/unregistered-event.exception'; + +type Constructor = new (...args: any[]) => T; + +interface EventRegistration { + eventClass: Constructor; + serializer?: IEventSerializer; +} + +@Injectable() +export class EventMap { + private readonly registry = new Map(); + + register( + eventClass: Constructor, + serializer?: IEventSerializer, + ): void { + const name: string | undefined = Reflect.getMetadata( + EVENT_NAME_METADATA, + eventClass, + ); + if (!name) { + throw new Error( + `Event class ${eventClass.name} is missing the @Event('name') decorator`, + ); + } + this.registry.set(name, { eventClass, serializer }); + } + + get(eventName: string): EventRegistration { + const reg = this.registry.get(eventName); + if (!reg) { + throw new UnregisteredEventException(eventName); + } + return reg; + } + + serialize(event: DomainEvent): IEventPayload { + const name: string | undefined = Reflect.getMetadata( + EVENT_NAME_METADATA, + event.constructor, + ); + if (!name) { + return { ...event } as IEventPayload; + } + const reg = this.registry.get(name); + if (reg?.serializer) { + return reg.serializer.serialize(event); + } + return { ...event } as IEventPayload; + } + + deserialize(eventName: string, payload: IEventPayload): DomainEvent { + const { eventClass, serializer } = this.get(eventName); + if (serializer) { + return serializer.deserialize(payload); + } + return Object.assign(Object.create(eventClass.prototype), payload); + } +} diff --git a/src/libs/exceptions/exception.base.ts b/src/libs/exceptions/exception.base.ts index 4201235e..de8d0073 100644 --- a/src/libs/exceptions/exception.base.ts +++ b/src/libs/exceptions/exception.base.ts @@ -43,8 +43,12 @@ export abstract class ExceptionBase extends Error { ) { super(message); Error.captureStackTrace(this, this.constructor); - const ctx = RequestContextService.getContext(); - this.correlationId = ctx.requestId; + try { + const ctx = RequestContextService.getContext(); + this.correlationId = ctx.requestId; + } catch { + this.correlationId = 'no-request-context'; + } } /** diff --git a/src/modules/wallet/application/event-handlers/create-wallet-when-user-is-created.domain-event-handler.ts b/src/modules/wallet/application/event-handlers/create-wallet-when-user-is-created.domain-event-handler.ts index 770aa9d4..ec2a8cd5 100644 --- a/src/modules/wallet/application/event-handlers/create-wallet-when-user-is-created.domain-event-handler.ts +++ b/src/modules/wallet/application/event-handlers/create-wallet-when-user-is-created.domain-event-handler.ts @@ -1,6 +1,6 @@ import { UserCreatedDomainEvent } from '@modules/user/domain/events/user-created.domain-event'; import { WalletRepositoryPort } from '@modules/wallet/database/wallet.repository.port'; -import { WalletEntity } from '../../domain/wallet.entity'; +import { WalletAggregate } from '../../domain/wallet.aggregate'; import { OnEvent } from '@nestjs/event-emitter'; import { Inject, Injectable } from '@nestjs/common'; import { WALLET_REPOSITORY } from '../../wallet.di-tokens'; @@ -12,12 +12,9 @@ export class CreateWalletWhenUserIsCreatedDomainEventHandler { private readonly walletRepo: WalletRepositoryPort, ) {} - // Handle a Domain Event by performing changes to other aggregates (inside the same Domain). @OnEvent(UserCreatedDomainEvent.name, { async: true, promisify: true }) async handle(event: UserCreatedDomainEvent): Promise { - const wallet = WalletEntity.create({ - userId: event.aggregateId, - }); - return this.walletRepo.insert(wallet); + const wallet = WalletAggregate.create(event.aggregateId); + return this.walletRepo.save(wallet); } } diff --git a/src/modules/wallet/commands/deposit-money/deposit-money.command.ts b/src/modules/wallet/commands/deposit-money/deposit-money.command.ts new file mode 100644 index 00000000..1781306e --- /dev/null +++ b/src/modules/wallet/commands/deposit-money/deposit-money.command.ts @@ -0,0 +1,12 @@ +import { Command, CommandProps } from '@libs/ddd'; + +export class DepositMoneyCommand extends Command { + readonly walletId: string; + readonly amount: number; + + constructor(props: CommandProps) { + super(props); + this.walletId = props.walletId; + this.amount = props.amount; + } +} diff --git a/src/modules/wallet/commands/deposit-money/deposit-money.http.controller.ts b/src/modules/wallet/commands/deposit-money/deposit-money.http.controller.ts new file mode 100644 index 00000000..261e7f38 --- /dev/null +++ b/src/modules/wallet/commands/deposit-money/deposit-money.http.controller.ts @@ -0,0 +1,51 @@ +import { + Body, + Controller, + HttpStatus, + NotFoundException, + Param, + Post, +} from '@nestjs/common'; +import { CommandBus } from '@nestjs/cqrs'; +import { ApiOperation, ApiResponse } from '@nestjs/swagger'; +import { match } from 'oxide.ts'; +import { routesV1 } from '@config/app.routes'; +import { ApiErrorResponse } from '@src/libs/api/api-error.response'; +import { DepositMoneyCommand } from './deposit-money.command'; +import { DepositMoneyRequestDto } from './deposit-money.request.dto'; +import { WalletNotFoundError } from '../../domain/wallet.errors'; +import { DepositMoneyResult } from './deposit-money.service'; + +export class DepositMoneyResponseDto { + walletId: string; + balance: number; +} + +@Controller(routesV1.version) +export class DepositMoneyHttpController { + constructor(private readonly commandBus: CommandBus) {} + + @Post(routesV1.transaction.deposit) + @ApiOperation({ summary: 'Deposit money into a wallet' }) + @ApiResponse({ status: HttpStatus.OK, type: DepositMoneyResponseDto }) + @ApiResponse({ status: HttpStatus.NOT_FOUND, type: ApiErrorResponse }) + @ApiResponse({ status: HttpStatus.BAD_REQUEST, type: ApiErrorResponse }) + async deposit( + @Param('walletId') walletId: string, + @Body() body: DepositMoneyRequestDto, + ): Promise { + const command = new DepositMoneyCommand({ walletId, amount: body.amount }); + + const result: DepositMoneyResult = await this.commandBus.execute(command); + + return match(result, { + Ok: ({ walletId: wid, balance }) => ({ walletId: wid, balance }), + Err: (error: Error) => { + if (error instanceof WalletNotFoundError) { + throw new NotFoundException(error.message); + } + throw error; + }, + }); + } +} diff --git a/src/modules/wallet/commands/deposit-money/deposit-money.request.dto.ts b/src/modules/wallet/commands/deposit-money/deposit-money.request.dto.ts new file mode 100644 index 00000000..d28cd6cf --- /dev/null +++ b/src/modules/wallet/commands/deposit-money/deposit-money.request.dto.ts @@ -0,0 +1,12 @@ +import { ApiProperty } from '@nestjs/swagger'; +import { IsNumber, IsPositive } from 'class-validator'; + +export class DepositMoneyRequestDto { + @ApiProperty({ + example: 100.0, + description: 'Amount to deposit — must be a positive number', + }) + @IsNumber() + @IsPositive() + readonly amount: number; +} diff --git a/src/modules/wallet/commands/deposit-money/deposit-money.service.ts b/src/modules/wallet/commands/deposit-money/deposit-money.service.ts new file mode 100644 index 00000000..1107fe6e --- /dev/null +++ b/src/modules/wallet/commands/deposit-money/deposit-money.service.ts @@ -0,0 +1,43 @@ +import { Inject, Logger } from '@nestjs/common'; +import { CommandHandler, ICommandHandler } from '@nestjs/cqrs'; +import { Err, Ok, Result } from 'oxide.ts'; +import { WALLET_REPOSITORY } from '@modules/wallet/wallet.di-tokens'; +import { WalletRepositoryPort } from '@modules/wallet/database/wallet.repository.port'; +import { WalletNotFoundError } from '@modules/wallet/domain/wallet.errors'; +import { DepositMoneyCommand } from './deposit-money.command'; + +export type DepositMoneyResult = Result< + { walletId: string; balance: number }, + WalletNotFoundError +>; + +@CommandHandler(DepositMoneyCommand) +export class DepositMoneyService + implements ICommandHandler +{ + constructor( + @Inject(WALLET_REPOSITORY) + private readonly walletRepo: WalletRepositoryPort, + @Inject(Logger) + private readonly logger: Logger, + ) {} + + async execute(command: DepositMoneyCommand): Promise { + this.logger.debug( + `[DepositMoneyService] walletId=${command.walletId}, amount=${command.amount}`, + ); + + const wallet = await this.walletRepo.findById(command.walletId); + if (!wallet) { + return Err(new WalletNotFoundError({ walletId: command.walletId })); + } + + wallet.deposit(command.amount); + await this.walletRepo.save(wallet); + + return Ok({ + walletId: command.walletId, + balance: wallet.getProps().balance, + }); + } +} diff --git a/src/modules/wallet/commands/withdraw-money/withdraw-money.command.ts b/src/modules/wallet/commands/withdraw-money/withdraw-money.command.ts new file mode 100644 index 00000000..5dd25a39 --- /dev/null +++ b/src/modules/wallet/commands/withdraw-money/withdraw-money.command.ts @@ -0,0 +1,12 @@ +import { Command, CommandProps } from '@libs/ddd'; + +export class WithdrawMoneyCommand extends Command { + readonly walletId: string; + readonly amount: number; + + constructor(props: CommandProps) { + super(props); + this.walletId = props.walletId; + this.amount = props.amount; + } +} diff --git a/src/modules/wallet/commands/withdraw-money/withdraw-money.http.controller.ts b/src/modules/wallet/commands/withdraw-money/withdraw-money.http.controller.ts new file mode 100644 index 00000000..22c73ef8 --- /dev/null +++ b/src/modules/wallet/commands/withdraw-money/withdraw-money.http.controller.ts @@ -0,0 +1,62 @@ +import { + Body, + Controller, + HttpStatus, + NotFoundException, + Param, + Post, + UnprocessableEntityException, +} from '@nestjs/common'; +import { CommandBus } from '@nestjs/cqrs'; +import { ApiOperation, ApiResponse } from '@nestjs/swagger'; +import { match } from 'oxide.ts'; +import { routesV1 } from '@config/app.routes'; +import { ApiErrorResponse } from '@src/libs/api/api-error.response'; +import { WithdrawMoneyCommand } from './withdraw-money.command'; +import { WithdrawMoneyRequestDto } from './withdraw-money.request.dto'; +import { + WalletNotFoundError, + WalletNotEnoughBalanceError, +} from '../../domain/wallet.errors'; +import { WithdrawMoneyResult } from './withdraw-money.service'; + +export class WithdrawMoneyResponseDto { + walletId: string; + balance: number; +} + +@Controller(routesV1.version) +export class WithdrawMoneyHttpController { + constructor(private readonly commandBus: CommandBus) {} + + @Post(routesV1.transaction.withdraw) + @ApiOperation({ summary: 'Withdraw money from a wallet' }) + @ApiResponse({ status: HttpStatus.OK, type: WithdrawMoneyResponseDto }) + @ApiResponse({ status: HttpStatus.NOT_FOUND, type: ApiErrorResponse }) + @ApiResponse({ + status: HttpStatus.UNPROCESSABLE_ENTITY, + type: ApiErrorResponse, + }) + @ApiResponse({ status: HttpStatus.BAD_REQUEST, type: ApiErrorResponse }) + async withdraw( + @Param('walletId') walletId: string, + @Body() body: WithdrawMoneyRequestDto, + ): Promise { + const command = new WithdrawMoneyCommand({ walletId, amount: body.amount }); + + const result: WithdrawMoneyResult = await this.commandBus.execute(command); + + return match(result, { + Ok: ({ walletId: wid, balance }) => ({ walletId: wid, balance }), + Err: (error: Error) => { + if (error instanceof WalletNotFoundError) { + throw new NotFoundException(error.message); + } + if (error instanceof WalletNotEnoughBalanceError) { + throw new UnprocessableEntityException(error.message); + } + throw error; + }, + }); + } +} diff --git a/src/modules/wallet/commands/withdraw-money/withdraw-money.request.dto.ts b/src/modules/wallet/commands/withdraw-money/withdraw-money.request.dto.ts new file mode 100644 index 00000000..00b7d702 --- /dev/null +++ b/src/modules/wallet/commands/withdraw-money/withdraw-money.request.dto.ts @@ -0,0 +1,12 @@ +import { ApiProperty } from '@nestjs/swagger'; +import { IsNumber, IsPositive } from 'class-validator'; + +export class WithdrawMoneyRequestDto { + @ApiProperty({ + example: 50.0, + description: 'Amount to withdraw — must be a positive number', + }) + @IsNumber() + @IsPositive() + readonly amount: number; +} diff --git a/src/modules/wallet/commands/withdraw-money/withdraw-money.service.ts b/src/modules/wallet/commands/withdraw-money/withdraw-money.service.ts new file mode 100644 index 00000000..ba3c6c0d --- /dev/null +++ b/src/modules/wallet/commands/withdraw-money/withdraw-money.service.ts @@ -0,0 +1,50 @@ +import { Inject, Logger } from '@nestjs/common'; +import { CommandHandler, ICommandHandler } from '@nestjs/cqrs'; +import { Err, Ok, Result } from 'oxide.ts'; +import { WALLET_REPOSITORY } from '@modules/wallet/wallet.di-tokens'; +import { WalletRepositoryPort } from '@modules/wallet/database/wallet.repository.port'; +import { + WalletNotFoundError, + WalletNotEnoughBalanceError, +} from '@modules/wallet/domain/wallet.errors'; +import { WithdrawMoneyCommand } from './withdraw-money.command'; + +export type WithdrawMoneyResult = Result< + { walletId: string; balance: number }, + WalletNotFoundError | WalletNotEnoughBalanceError +>; + +@CommandHandler(WithdrawMoneyCommand) +export class WithdrawMoneyService + implements ICommandHandler +{ + constructor( + @Inject(WALLET_REPOSITORY) + private readonly walletRepo: WalletRepositoryPort, + @Inject(Logger) + private readonly logger: Logger, + ) {} + + async execute(command: WithdrawMoneyCommand): Promise { + this.logger.debug( + `[WithdrawMoneyService] walletId=${command.walletId}, amount=${command.amount}`, + ); + + const wallet = await this.walletRepo.findById(command.walletId); + if (!wallet) { + return Err(new WalletNotFoundError({ walletId: command.walletId })); + } + + const withdrawResult = wallet.withdraw(command.amount); + if (withdrawResult.isErr()) { + return Err(withdrawResult.unwrapErr()); + } + + await this.walletRepo.save(wallet); + + return Ok({ + walletId: command.walletId, + balance: wallet.getProps().balance, + }); + } +} diff --git a/src/modules/wallet/database/wallet-transaction-read.mapper.ts b/src/modules/wallet/database/wallet-transaction-read.mapper.ts new file mode 100644 index 00000000..2c9f7ede --- /dev/null +++ b/src/modules/wallet/database/wallet-transaction-read.mapper.ts @@ -0,0 +1,25 @@ +import { RawTransactionRow } from './wallet-transaction-read.repository.port'; +import { TransactionResponseDto } from '../dtos/transaction.response.dto'; + +export interface TransactionReadDbRecord { + readonly eventId: string; + readonly walletId: string; + readonly type: 'credit' | 'debit'; + readonly amount: number; + readonly balanceAfter: number; + readonly occurredOn: Date; +} + +export class WalletTransactionReadMapper { + toResponse(row: RawTransactionRow): TransactionResponseDto { + return { + id: row.id, + eventId: row.event_id, + walletId: row.wallet_id, + type: row.type, + amount: Number(row.amount), + balanceAfter: Number(row.balance_after), + occurredOn: row.occurred_on.toISOString(), + }; + } +} diff --git a/src/modules/wallet/database/wallet-transaction-read.repository.port.ts b/src/modules/wallet/database/wallet-transaction-read.repository.port.ts new file mode 100644 index 00000000..cf626d01 --- /dev/null +++ b/src/modules/wallet/database/wallet-transaction-read.repository.port.ts @@ -0,0 +1,17 @@ +export interface RawTransactionRow { + readonly id: string; + readonly event_id: string; + readonly wallet_id: string; + readonly type: 'credit' | 'debit'; + readonly amount: number; + readonly balance_after: number; + readonly occurred_on: Date; +} + +export interface WalletTransactionReadRepositoryPort { + findByWalletId( + walletId: string, + page: number, + limit: number, + ): Promise<{ rows: RawTransactionRow[]; total: number }>; +} diff --git a/src/modules/wallet/database/wallet-transaction-read.repository.ts b/src/modules/wallet/database/wallet-transaction-read.repository.ts new file mode 100644 index 00000000..3430d9db --- /dev/null +++ b/src/modules/wallet/database/wallet-transaction-read.repository.ts @@ -0,0 +1,60 @@ +import { Injectable } from '@nestjs/common'; +import { InjectPool } from 'nestjs-slonik'; +import { DatabasePool, sql } from 'slonik'; +import { z } from 'zod'; +import { + RawTransactionRow, + WalletTransactionReadRepositoryPort, +} from './wallet-transaction-read.repository.port'; + +const transactionReadSchema = z.object({ + id: z.string(), + event_id: z.string(), + wallet_id: z.string(), + type: z.enum(['credit', 'debit']), + amount: z.number(), + balance_after: z.number(), + occurred_on: z.preprocess( + (val: unknown) => new Date(val as string), + z.date(), + ), +}); + +@Injectable() +export class WalletTransactionReadRepository + implements WalletTransactionReadRepositoryPort +{ + constructor(@InjectPool() private readonly pool: DatabasePool) {} + + async findByWalletId( + walletId: string, + page: number, + limit: number, + ): Promise<{ rows: RawTransactionRow[]; total: number }> { + const offset = (page - 1) * limit; + + const statement = sql.type(transactionReadSchema)` + SELECT id, event_id, wallet_id, type, amount, balance_after, occurred_on + FROM transaction_reads + WHERE wallet_id = ${walletId}::uuid + ORDER BY occurred_on DESC + LIMIT ${limit} + OFFSET ${offset} + `; + + const countStatement = sql` + SELECT COUNT(*)::int AS total + FROM transaction_reads + WHERE wallet_id = ${walletId}::uuid + `; + + const [records, countResult] = await Promise.all([ + this.pool.query(statement), + this.pool.query(countStatement), + ]); + + const total = (countResult.rows[0] as any)?.total ?? 0; + + return { rows: records.rows as RawTransactionRow[], total }; + } +} diff --git a/src/modules/wallet/database/wallet.repository.port.ts b/src/modules/wallet/database/wallet.repository.port.ts index c76cc976..5caee33b 100644 --- a/src/modules/wallet/database/wallet.repository.port.ts +++ b/src/modules/wallet/database/wallet.repository.port.ts @@ -1,4 +1,4 @@ -import { RepositoryPort } from '@libs/ddd'; -import { WalletEntity } from '../domain/wallet.entity'; +import { EventStoreRepositoryPort } from '@libs/event-sourcing'; +import { WalletAggregate } from '../domain/wallet.aggregate'; -export type WalletRepositoryPort = RepositoryPort; +export type WalletRepositoryPort = EventStoreRepositoryPort; diff --git a/src/modules/wallet/database/wallet.repository.ts b/src/modules/wallet/database/wallet.repository.ts index d4e5da27..d9b83a1d 100644 --- a/src/modules/wallet/database/wallet.repository.ts +++ b/src/modules/wallet/database/wallet.repository.ts @@ -1,38 +1,78 @@ -import { InjectPool } from 'nestjs-slonik'; -import { DatabasePool } from 'slonik'; -import { z } from 'zod'; -import { SqlRepositoryBase } from '@src/libs/db/sql-repository.base'; -import { WalletRepositoryPort } from './wallet.repository.port'; -import { WalletEntity } from '../domain/wallet.entity'; -import { WalletMapper } from '../wallet.mapper'; -import { Injectable, Logger } from '@nestjs/common'; -import { EventEmitter2 } from '@nestjs/event-emitter'; - -export const walletSchema = z.object({ - id: z.string().min(1).max(255), - createdAt: z.preprocess((val: any) => new Date(val), z.date()), - updatedAt: z.preprocess((val: any) => new Date(val), z.date()), - balance: z.number().min(0).max(9999999), - userId: z.string().min(1).max(255), -}); - -export type WalletModel = z.TypeOf; +import { Injectable, Logger, Inject } from '@nestjs/common'; +import { + EventStoreRepositoryBase, + EVENT_STORE, + SNAPSHOT_STORE, + EVENT_BUS, + EventBus, +} from '@libs/event-sourcing'; +import { EventMap } from '@libs/event-sourcing/services/event-map'; +import { IEventStore } from '@libs/event-sourcing/interfaces/event-store.interface'; +import { ISnapshotStore } from '@libs/event-sourcing/interfaces/snapshot-store.interface'; +import { Snapshot } from '@libs/event-sourcing/models/snapshot'; +import { WalletAggregate } from '../domain/wallet.aggregate'; +import { Transaction } from '../domain/entities/transaction.entity'; @Injectable() -export class WalletRepository - extends SqlRepositoryBase - implements WalletRepositoryPort -{ - protected tableName = 'wallets'; - - protected schema = walletSchema; +export class WalletEventStoreRepository extends EventStoreRepositoryBase { + protected readonly poolName = 'wallets'; constructor( - @InjectPool() - pool: DatabasePool, - mapper: WalletMapper, - eventEmitter: EventEmitter2, + @Inject(EVENT_STORE) eventStore: IEventStore, + @Inject(SNAPSHOT_STORE) snapshotStore: ISnapshotStore, + @Inject(EVENT_BUS) eventBus: EventBus, + @Inject(Logger) logger: Logger, + eventMap: EventMap, ) { - super(pool, mapper, eventEmitter, new Logger(WalletRepository.name)); + super( + eventStore, + snapshotStore, + WalletAggregate, + eventBus, + logger, + eventMap, + ); + } + + protected toSnapshotPayload( + aggregate: WalletAggregate, + ): Record { + const props = aggregate.getProps(); + return { + userId: props.userId, + balance: props.balance, + version: aggregate.version, + transactions: props.transactions.map((tx) => ({ + id: tx.id, + type: tx.type, + amount: tx.amount, + balanceAfter: tx.balanceAfter, + occurredOn: tx.occurredOn.toISOString(), + })), + }; + } + + protected fromSnapshot(snapshot: Snapshot): WalletAggregate { + const aggregate = new WalletAggregate(); + (aggregate as any)._id = snapshot.aggregateId; + const transactions = ((snapshot.payload.transactions as any[]) ?? []).map( + (t: any) => + new Transaction({ + id: t.id, + props: { + type: t.type, + amount: t.amount, + balanceAfter: t.balanceAfter, + occurredOn: new Date(t.occurredOn), + }, + }), + ); + (aggregate as any).props = { + userId: snapshot.payload.userId, + balance: snapshot.payload.balance, + transactions, + }; + aggregate.version = snapshot.aggregateVersion; + return aggregate; } } diff --git a/src/modules/wallet/domain/entities/transaction.entity.ts b/src/modules/wallet/domain/entities/transaction.entity.ts new file mode 100644 index 00000000..14ca3937 --- /dev/null +++ b/src/modules/wallet/domain/entities/transaction.entity.ts @@ -0,0 +1,34 @@ +import { Entity, AggregateID } from '@libs/ddd'; + +export type TransactionType = 'credit' | 'debit'; + +export interface TransactionProps { + readonly type: TransactionType; + readonly amount: number; + readonly balanceAfter: number; + readonly occurredOn: Date; +} + +export class Transaction extends Entity { + protected _id: AggregateID = ''; + + get type(): TransactionType { + return this.props.type; + } + + get amount(): number { + return this.props.amount; + } + + get balanceAfter(): number { + return this.props.balanceAfter; + } + + get occurredOn(): Date { + return this.props.occurredOn; + } + + validate(): void { + // Invariants enforced at aggregate level before entity creation + } +} diff --git a/src/modules/wallet/domain/events/money-deposited.domain-event.ts b/src/modules/wallet/domain/events/money-deposited.domain-event.ts new file mode 100644 index 00000000..035d9a2c --- /dev/null +++ b/src/modules/wallet/domain/events/money-deposited.domain-event.ts @@ -0,0 +1,16 @@ +import { DomainEvent, DomainEventProps } from '@libs/ddd'; +import { Event } from '@libs/event-sourcing'; + +@Event('money-deposited') +export class MoneyDepositedDomainEvent extends DomainEvent { + readonly transactionId: string; + readonly amount: number; + readonly balanceAfter: number; + + constructor(props: DomainEventProps) { + super(props); + this.transactionId = props.transactionId; + this.amount = props.amount; + this.balanceAfter = props.balanceAfter; + } +} diff --git a/src/modules/wallet/domain/events/money-withdrawn.domain-event.ts b/src/modules/wallet/domain/events/money-withdrawn.domain-event.ts new file mode 100644 index 00000000..fa4df7ac --- /dev/null +++ b/src/modules/wallet/domain/events/money-withdrawn.domain-event.ts @@ -0,0 +1,16 @@ +import { DomainEvent, DomainEventProps } from '@libs/ddd'; +import { Event } from '@libs/event-sourcing'; + +@Event('money-withdrawn') +export class MoneyWithdrawnDomainEvent extends DomainEvent { + readonly transactionId: string; + readonly amount: number; + readonly balanceAfter: number; + + constructor(props: DomainEventProps) { + super(props); + this.transactionId = props.transactionId; + this.amount = props.amount; + this.balanceAfter = props.balanceAfter; + } +} diff --git a/src/modules/wallet/domain/events/wallet-created.domain-event.ts b/src/modules/wallet/domain/events/wallet-created.domain-event.ts index 9ac1d066..31a74e26 100644 --- a/src/modules/wallet/domain/events/wallet-created.domain-event.ts +++ b/src/modules/wallet/domain/events/wallet-created.domain-event.ts @@ -1,9 +1,12 @@ import { DomainEvent, DomainEventProps } from '@libs/ddd'; +import { Event } from '@libs/event-sourcing'; +@Event('wallet-created') export class WalletCreatedDomainEvent extends DomainEvent { readonly userId: string; constructor(props: DomainEventProps) { super(props); + this.userId = props.userId; } } diff --git a/src/modules/wallet/domain/wallet.aggregate.ts b/src/modules/wallet/domain/wallet.aggregate.ts new file mode 100644 index 00000000..4cd75728 --- /dev/null +++ b/src/modules/wallet/domain/wallet.aggregate.ts @@ -0,0 +1,106 @@ +import { AggregateRoot, AggregateID } from '@libs/ddd'; +import { Err, Ok, Result } from 'oxide.ts'; +import { Aggregate, EventHandler } from '@libs/event-sourcing'; +import { randomUUID } from 'crypto'; +import { Transaction } from './entities/transaction.entity'; +import { WalletCreatedDomainEvent } from './events/wallet-created.domain-event'; +import { MoneyDepositedDomainEvent } from './events/money-deposited.domain-event'; +import { MoneyWithdrawnDomainEvent } from './events/money-withdrawn.domain-event'; +import { WalletNotEnoughBalanceError } from './wallet.errors'; + +export interface WalletProps { + userId: string; + balance: number; + transactions: Transaction[]; +} + +@Aggregate('wallet') +export class WalletAggregate extends AggregateRoot { + protected _id: AggregateID = ''; + + static create(userId: string): WalletAggregate { + const id = randomUUID(); + const wallet = new WalletAggregate(); + wallet.applyEvent( + new WalletCreatedDomainEvent({ aggregateId: id, userId }), + ); + return wallet; + } + + deposit(amount: number): void { + const transactionId = randomUUID(); + const balanceAfter = this.props.balance + amount; + this.applyEvent( + new MoneyDepositedDomainEvent({ + aggregateId: this._id, + transactionId, + amount, + balanceAfter, + }), + ); + } + + withdraw(amount: number): Result { + if (this.props.balance - amount < 0) { + return Err(new WalletNotEnoughBalanceError()); + } + const transactionId = randomUUID(); + const balanceAfter = this.props.balance - amount; + this.applyEvent( + new MoneyWithdrawnDomainEvent({ + aggregateId: this._id, + transactionId, + amount, + balanceAfter, + }), + ); + return Ok(null); + } + + getTransactions(): ReadonlyArray { + return this.props.transactions; + } + + @EventHandler(WalletCreatedDomainEvent) + private onWalletCreated(event: WalletCreatedDomainEvent): void { + this._id = event.aggregateId; + (this as any).props = { + userId: event.userId, + balance: 0, + transactions: [], + }; + } + + @EventHandler(MoneyDepositedDomainEvent) + private onMoneyDeposited(event: MoneyDepositedDomainEvent): void { + const tx = new Transaction({ + id: event.transactionId, + props: { + type: 'credit', + amount: event.amount, + balanceAfter: event.balanceAfter, + occurredOn: new Date(event.metadata.timestamp), + }, + }); + this.props.transactions.push(tx); + this.props.balance = event.balanceAfter; + } + + @EventHandler(MoneyWithdrawnDomainEvent) + private onMoneyWithdrawn(event: MoneyWithdrawnDomainEvent): void { + const tx = new Transaction({ + id: event.transactionId, + props: { + type: 'debit', + amount: event.amount, + balanceAfter: event.balanceAfter, + occurredOn: new Date(event.metadata.timestamp), + }, + }); + this.props.transactions.push(tx); + this.props.balance = event.balanceAfter; + } + + // eslint-disable-next-line @typescript-eslint/no-empty-function + validate(): void {} +} diff --git a/src/modules/wallet/domain/wallet.errors.ts b/src/modules/wallet/domain/wallet.errors.ts index d43ead17..99f69a92 100644 --- a/src/modules/wallet/domain/wallet.errors.ts +++ b/src/modules/wallet/domain/wallet.errors.ts @@ -1,5 +1,15 @@ import { ExceptionBase } from '@libs/exceptions'; +export class WalletNotFoundError extends ExceptionBase { + static readonly message = 'Wallet not found'; + + public readonly code = 'WALLET.NOT_FOUND'; + + constructor(metadata?: unknown) { + super(WalletNotFoundError.message, undefined, metadata); + } +} + export class WalletNotEnoughBalanceError extends ExceptionBase { static readonly message = 'Wallet has not enough balance'; diff --git a/src/modules/wallet/dtos/transaction.response.dto.ts b/src/modules/wallet/dtos/transaction.response.dto.ts new file mode 100644 index 00000000..26f6834c --- /dev/null +++ b/src/modules/wallet/dtos/transaction.response.dto.ts @@ -0,0 +1,41 @@ +import { ApiProperty } from '@nestjs/swagger'; + +export class TransactionResponseDto { + @ApiProperty({ example: 'a1b2c3d4-e5f6-7890-abcd-ef1234567890' }) + id: string; + + @ApiProperty({ example: 'e5f6a7b8-c9d0-1234-ef56-789012345678' }) + eventId: string; + + @ApiProperty({ example: '550e8400-e29b-41d4-a716-446655440000' }) + walletId: string; + + @ApiProperty({ enum: ['credit', 'debit'], example: 'credit' }) + type: 'credit' | 'debit'; + + @ApiProperty({ example: 100.0 }) + amount: number; + + @ApiProperty({ example: 250.0 }) + balanceAfter: number; + + @ApiProperty({ example: '2026-04-07T12:00:00.000Z' }) + occurredOn: string; +} + +export class TransactionPaginatedResponseDto { + @ApiProperty({ example: '550e8400-e29b-41d4-a716-446655440000' }) + walletId: string; + + @ApiProperty({ type: [TransactionResponseDto] }) + transactions: TransactionResponseDto[]; + + @ApiProperty({ example: 42 }) + count: number; + + @ApiProperty({ example: 20 }) + limit: number; + + @ApiProperty({ example: 1 }) + page: number; +} diff --git a/src/modules/wallet/projections/wallet.projection.ts b/src/modules/wallet/projections/wallet.projection.ts new file mode 100644 index 00000000..c10afb14 --- /dev/null +++ b/src/modules/wallet/projections/wallet.projection.ts @@ -0,0 +1,76 @@ +import { + Injectable, + OnModuleInit, + OnModuleDestroy, + Inject, + Logger, +} from '@nestjs/common'; +import { InjectPool } from 'nestjs-slonik'; +import { DatabasePool, sql } from 'slonik'; +import { Subscription } from 'rxjs'; +import { EventBus, EventEnvelope, EVENT_BUS } from '@libs/event-sourcing'; + +@Injectable() +export class WalletProjectionHandler implements OnModuleInit, OnModuleDestroy { + private subscription: Subscription; + + constructor( + @Inject(EVENT_BUS) private readonly eventBus: EventBus, + @InjectPool() private readonly pool: DatabasePool, + @Inject(Logger) private readonly logger: Logger, + ) {} + + onModuleInit(): void { + this.subscription = this.eventBus.subscribe( + (envelope) => + void this.handle(envelope).catch((err) => + this.logger.error(`WalletProjection error: ${err?.message}`, err), + ), + ); + } + + onModuleDestroy(): void { + this.subscription?.unsubscribe(); + } + + private async handle(envelope: EventEnvelope): Promise { + const { event, payload, metadata } = envelope; + + if (event === 'wallet-created') { + await this.pool.query(sql` + INSERT INTO wallets (id, "userId", balance, "createdAt", "updatedAt") + VALUES ( + ${metadata.aggregateId}, + ${payload['userId'] as string}, + 0, + ${metadata.occurredOn.toISOString()}::timestamptz, + ${metadata.occurredOn.toISOString()}::timestamptz + ) + ON CONFLICT (id) DO NOTHING + `); + } else if (event === 'money-deposited' || event === 'money-withdrawn') { + const type = event === 'money-deposited' ? 'credit' : 'debit'; + + await this.pool.query(sql` + UPDATE wallets + SET balance = ${payload['balanceAfter'] as number}, "updatedAt" = NOW() + WHERE id = ${metadata.aggregateId} + `); + + await this.pool.query(sql` + INSERT INTO transaction_reads + (id, event_id, wallet_id, type, amount, balance_after, occurred_on) + VALUES ( + ${payload['transactionId'] as string}, + ${metadata.eventId}, + ${metadata.aggregateId}, + ${type}, + ${payload['amount'] as number}, + ${payload['balanceAfter'] as number}, + ${metadata.occurredOn.toISOString()}::timestamptz + ) + ON CONFLICT (event_id) DO NOTHING + `); + } + } +} diff --git a/src/modules/wallet/queries/get-wallet-transactions/get-wallet-transactions.http.controller.ts b/src/modules/wallet/queries/get-wallet-transactions/get-wallet-transactions.http.controller.ts new file mode 100644 index 00000000..b7183f1b --- /dev/null +++ b/src/modules/wallet/queries/get-wallet-transactions/get-wallet-transactions.http.controller.ts @@ -0,0 +1,36 @@ +import { Controller, Get, HttpStatus, Param, Query } from '@nestjs/common'; +import { QueryBus } from '@nestjs/cqrs'; +import { ApiOperation, ApiResponse } from '@nestjs/swagger'; +import { Result } from 'oxide.ts'; +import { routesV1 } from '@config/app.routes'; +import { PaginatedQueryRequestDto } from '@src/libs/api/paginated-query.request.dto'; +import { GetWalletTransactionsParamDto } from './get-wallet-transactions.request.dto'; +import { GetWalletTransactionsQuery } from './get-wallet-transactions.query'; +import { TransactionPaginatedResponseDto } from '../../dtos/transaction.response.dto'; + +@Controller(routesV1.version) +export class GetWalletTransactionsHttpController { + constructor(private readonly queryBus: QueryBus) {} + + @Get(routesV1.transaction.getTransactions) + @ApiOperation({ summary: 'Get paginated transaction history for a wallet' }) + @ApiResponse({ + status: HttpStatus.OK, + type: TransactionPaginatedResponseDto, + }) + async getTransactions( + @Param() params: GetWalletTransactionsParamDto, + @Query() queryParams: PaginatedQueryRequestDto, + ): Promise { + const query = new GetWalletTransactionsQuery({ + walletId: params.walletId, + limit: queryParams?.limit, + page: queryParams?.page, + }); + + const result: Result = + await this.queryBus.execute(query); + + return result.unwrap(); + } +} diff --git a/src/modules/wallet/queries/get-wallet-transactions/get-wallet-transactions.query-handler.ts b/src/modules/wallet/queries/get-wallet-transactions/get-wallet-transactions.query-handler.ts new file mode 100644 index 00000000..9cff5020 --- /dev/null +++ b/src/modules/wallet/queries/get-wallet-transactions/get-wallet-transactions.query-handler.ts @@ -0,0 +1,51 @@ +import { IQueryHandler, QueryHandler } from '@nestjs/cqrs'; +import { Inject, Logger } from '@nestjs/common'; +import { Ok, Result } from 'oxide.ts'; +import { GetWalletTransactionsQuery } from './get-wallet-transactions.query'; +import { + TransactionPaginatedResponseDto, + TransactionResponseDto, +} from '../../dtos/transaction.response.dto'; +import { WalletTransactionReadRepositoryPort } from '../../database/wallet-transaction-read.repository.port'; +import { WalletTransactionReadMapper } from '../../database/wallet-transaction-read.mapper'; +import { WALLET_READ_REPOSITORY } from '../../wallet.di-tokens'; + +@QueryHandler(GetWalletTransactionsQuery) +export class GetWalletTransactionsQueryHandler implements IQueryHandler { + constructor( + @Inject(WALLET_READ_REPOSITORY) + private readonly readRepo: WalletTransactionReadRepositoryPort, + private readonly mapper: WalletTransactionReadMapper, + @Inject(Logger) + private readonly logger: Logger, + ) {} + + async execute( + query: GetWalletTransactionsQuery, + ): Promise> { + const limit = Math.min(query.limit ?? 20, 100); + const page = Math.max(query.page ?? 1, 1); + + this.logger.debug( + `[GetWalletTransactionsQueryHandler] walletId=${query.walletId}, page=${page}, limit=${limit}`, + ); + + const { rows, total } = await this.readRepo.findByWalletId( + query.walletId, + page, + limit, + ); + + const transactions: TransactionResponseDto[] = rows.map((row) => + this.mapper.toResponse(row), + ); + + return Ok({ + walletId: query.walletId, + transactions, + count: total, + limit, + page, + }); + } +} diff --git a/src/modules/wallet/queries/get-wallet-transactions/get-wallet-transactions.query.ts b/src/modules/wallet/queries/get-wallet-transactions/get-wallet-transactions.query.ts new file mode 100644 index 00000000..2a914be9 --- /dev/null +++ b/src/modules/wallet/queries/get-wallet-transactions/get-wallet-transactions.query.ts @@ -0,0 +1,10 @@ +import { PaginatedQueryBase, PaginatedParams } from '@libs/ddd/query.base'; + +export class GetWalletTransactionsQuery extends PaginatedQueryBase { + readonly walletId: string; + + constructor(props: PaginatedParams) { + super(props); + this.walletId = props.walletId; + } +} diff --git a/src/modules/wallet/queries/get-wallet-transactions/get-wallet-transactions.request.dto.ts b/src/modules/wallet/queries/get-wallet-transactions/get-wallet-transactions.request.dto.ts new file mode 100644 index 00000000..bfc908fd --- /dev/null +++ b/src/modules/wallet/queries/get-wallet-transactions/get-wallet-transactions.request.dto.ts @@ -0,0 +1,11 @@ +import { ApiProperty } from '@nestjs/swagger'; +import { IsUUID } from 'class-validator'; + +export class GetWalletTransactionsParamDto { + @ApiProperty({ + example: '550e8400-e29b-41d4-a716-446655440000', + description: 'Wallet ID', + }) + @IsUUID() + readonly walletId: string; +} diff --git a/src/modules/wallet/wallet.di-tokens.ts b/src/modules/wallet/wallet.di-tokens.ts index 954a06b0..8c58fcea 100644 --- a/src/modules/wallet/wallet.di-tokens.ts +++ b/src/modules/wallet/wallet.di-tokens.ts @@ -1 +1,2 @@ export const WALLET_REPOSITORY = Symbol('WALLET_REPOSITORY'); +export const WALLET_READ_REPOSITORY = Symbol('WALLET_READ_REPOSITORY'); diff --git a/src/modules/wallet/wallet.mapper.ts b/src/modules/wallet/wallet.mapper.ts deleted file mode 100644 index 6bd0ab94..00000000 --- a/src/modules/wallet/wallet.mapper.ts +++ /dev/null @@ -1,36 +0,0 @@ -import { Mapper } from '@libs/ddd'; -import { Injectable } from '@nestjs/common'; -import { WalletEntity } from './domain/wallet.entity'; -import { WalletModel, walletSchema } from './database/wallet.repository'; - -@Injectable() -export class WalletMapper implements Mapper { - toPersistence(entity: WalletEntity): WalletModel { - const copy = entity.getProps(); - const record: WalletModel = { - id: copy.id, - createdAt: copy.createdAt, - updatedAt: copy.updatedAt, - userId: copy.userId, - balance: copy.balance, - }; - return walletSchema.parse(record); - } - - toDomain(record: WalletModel): WalletEntity { - const entity = new WalletEntity({ - id: record.id, - createdAt: record.createdAt, - updatedAt: record.updatedAt, - props: { - userId: record.userId, - balance: record.balance, - }, - }); - return entity; - } - - toResponse(): any { - throw new Error('Not implemented'); - } -} diff --git a/src/modules/wallet/wallet.module.ts b/src/modules/wallet/wallet.module.ts index 99ad0fe4..f2428bd0 100644 --- a/src/modules/wallet/wallet.module.ts +++ b/src/modules/wallet/wallet.module.ts @@ -1,22 +1,72 @@ import { Logger, Module, Provider } from '@nestjs/common'; +import { CqrsModule } from '@nestjs/cqrs'; +import { EventSourcingModule } from '@libs/event-sourcing'; + +import { WalletCreatedDomainEvent } from './domain/events/wallet-created.domain-event'; +import { MoneyDepositedDomainEvent } from './domain/events/money-deposited.domain-event'; +import { MoneyWithdrawnDomainEvent } from './domain/events/money-withdrawn.domain-event'; + import { CreateWalletWhenUserIsCreatedDomainEventHandler } from './application/event-handlers/create-wallet-when-user-is-created.domain-event-handler'; -import { WalletRepository } from './database/wallet.repository'; -import { WALLET_REPOSITORY } from './wallet.di-tokens'; -import { WalletMapper } from './wallet.mapper'; +import { WalletEventStoreRepository } from './database/wallet.repository'; +import { WalletProjectionHandler } from './projections/wallet.projection'; +import { WalletTransactionReadRepository } from './database/wallet-transaction-read.repository'; +import { WalletTransactionReadMapper } from './database/wallet-transaction-read.mapper'; + +import { DepositMoneyService } from './commands/deposit-money/deposit-money.service'; +import { DepositMoneyHttpController } from './commands/deposit-money/deposit-money.http.controller'; +import { WithdrawMoneyService } from './commands/withdraw-money/withdraw-money.service'; +import { WithdrawMoneyHttpController } from './commands/withdraw-money/withdraw-money.http.controller'; + +import { GetWalletTransactionsQueryHandler } from './queries/get-wallet-transactions/get-wallet-transactions.query-handler'; +import { GetWalletTransactionsHttpController } from './queries/get-wallet-transactions/get-wallet-transactions.http.controller'; + +import { WALLET_REPOSITORY, WALLET_READ_REPOSITORY } from './wallet.di-tokens'; const eventHandlers: Provider[] = [ CreateWalletWhenUserIsCreatedDomainEventHandler, ]; -const mappers: Provider[] = [WalletMapper]; +const commandHandlers: Provider[] = [DepositMoneyService, WithdrawMoneyService]; + +const queryHandlers: Provider[] = [GetWalletTransactionsQueryHandler]; const repositories: Provider[] = [ - { provide: WALLET_REPOSITORY, useClass: WalletRepository }, + { provide: WALLET_REPOSITORY, useClass: WalletEventStoreRepository }, + { + provide: WALLET_READ_REPOSITORY, + useClass: WalletTransactionReadRepository, + }, ]; +const mappers: Provider[] = [WalletTransactionReadMapper]; + +const projections: Provider[] = [WalletProjectionHandler]; + @Module({ - imports: [], - controllers: [], - providers: [Logger, ...eventHandlers, ...mappers, ...repositories], + imports: [ + CqrsModule, + EventSourcingModule.forFeature({ + events: [ + WalletCreatedDomainEvent, + MoneyDepositedDomainEvent, + MoneyWithdrawnDomainEvent, + ], + }), + ], + controllers: [ + DepositMoneyHttpController, + WithdrawMoneyHttpController, + GetWalletTransactionsHttpController, + ], + providers: [ + Logger, + ...eventHandlers, + ...commandHandlers, + ...queryHandlers, + ...repositories, + ...mappers, + ...projections, + ], + exports: [WALLET_REPOSITORY], }) export class WalletModule {} diff --git a/tests/behavioral/features/event-store-repository.feature b/tests/behavioral/features/event-store-repository.feature new file mode 100644 index 00000000..36c15ff8 --- /dev/null +++ b/tests/behavioral/features/event-store-repository.feature @@ -0,0 +1,67 @@ +Feature: Event Store Repository + + # US1 — Persist Domain Events via Repository + Scenario: Save uncommitted events to the event store + Given a domain aggregate with one or more uncommitted events + When the repository's save method is called + Then all uncommitted events are appended to the aggregate's event stream + And the aggregate has no remaining uncommitted events + + Scenario: No-op when aggregate has no uncommitted events + Given an aggregate with no uncommitted events + When the repository's save method is called + Then no events are written to the store + And the operation completes without error + + Scenario: Version conflict is detected and rejected + Given an aggregate that has already been saved + When the repository attempts to save it again with a stale version number + Then the operation fails with a version conflict error + And the event stream remains unchanged + + # US2 — Reconstruct Aggregate State from Event History + Scenario: Reconstruct aggregate by replaying full event history + Given an event stream containing multiple historical events for an aggregate + When findById is called with the aggregate's ID + Then it returns the aggregate with its state fully reconstructed in chronological order + + Scenario: Returns null for an unknown aggregate ID + Given no events exist for a given aggregate ID + When findById is called + Then it returns null + + Scenario: Uses snapshot as baseline and replays only tail events + Given a snapshot exists for an aggregate at version 5 + And 2 additional events exist after the snapshot + When the repository loads that aggregate + Then the aggregate version is 7 + And only the 2 tail events were replayed + + # US3 — Query Event History Across Aggregates + Scenario: Returns envelopes in date range across multiple aggregates + Given events from multiple aggregates stored in the default event pool + When a query for all envelopes within a date range is made + Then only envelopes within that range are returned in chronological order + + Scenario: Returns empty result when no events match the date filter + Given no events exist within the specified date range + When the query is executed + Then an empty result is returned without error + + # US4 — Automatic Event Publishing After Persistence + Scenario: Publishes envelopes to event bus after successful save + Given a subscriber registered for a specific event type + When an aggregate containing that event type is saved + Then the subscriber receives the event envelope after events are written to the store + + Scenario: Does not publish when appendEvents throws + Given the event store persistence fails mid-operation + When the save operation throws an error + Then no events are published to the event bus + + Scenario: Logs error and returns successfully when EventBus publish throws + Given the event store persists events successfully + And the event bus publish call throws an error + When save is called + Then save succeeds without rethrowing + And the error is logged via LoggerPort.error diff --git a/tests/behavioral/features/transaction.feature b/tests/behavioral/features/transaction.feature new file mode 100644 index 00000000..d4451e35 --- /dev/null +++ b/tests/behavioral/features/transaction.feature @@ -0,0 +1,102 @@ +Feature: Wallet Transaction Management + As a developer building wallet features + I want deposit, withdrawal, and transaction history operations + So that all wallet money movements are durably recorded and queryable + + Background: + Given a user exists in the system + And a wallet exists for that user with balance 0 + + # ─── Scenario 1: Successful deposit ──────────────────────────────────────── + + Scenario: Deposit money into a wallet + Given the wallet exists + When a deposit of 100.00 is made to the wallet + Then the wallet balance should be 100.00 + And a "money-deposited" event should exist in the transaction event store + And the event should have amount 100.00 and balance_after 100.00 + And the transaction read model should contain a "credit" record with amount 100.00 + + # ─── Scenario 2: Successful withdrawal ───────────────────────────────────── + + Scenario: Withdraw money from a wallet with sufficient balance + Given the wallet has a balance of 200.00 + When a withdrawal of 75.00 is made from the wallet + Then the wallet balance should be 125.00 + And a "money-withdrawn" event should exist in the transaction event store + And the event should have amount 75.00 and balance_after 125.00 + And the transaction read model should contain a "debit" record with amount 75.00 + + # ─── Scenario 3: Insufficient balance ────────────────────────────────────── + + Scenario: Withdrawal rejected when balance is insufficient + Given the wallet has a balance of 50.00 + When a withdrawal of 100.00 is attempted + Then the withdrawal should be rejected with status 422 + And no new event should be appended to the transaction event store + And the wallet balance should remain 50.00 + + # ─── Scenario 4: Transaction history ─────────────────────────────────────── + + Scenario: Retrieve transaction history for a wallet with multiple transactions + Given the wallet has had the following transactions: + | type | amount | + | deposit | 200.00 | + | withdrawal | 50.00 | + | deposit | 75.00 | + When the transaction history is queried for the wallet + Then the response should contain 3 transactions + And the transactions should be ordered newest first + And the first transaction should be of type "credit" with amount 75.00 + And the second transaction should be of type "debit" with amount 50.00 + And the third transaction should be of type "credit" with amount 200.00 + + # ─── Scenario 5: Empty wallet history ────────────────────────────────────── + + Scenario: Query transaction history for a wallet with no transactions + Given a wallet exists with no prior transactions + When the transaction history is queried for that wallet + Then the response should be successful (HTTP 200) + And the transactions array should be empty + And the count should be 0 + + # ─── Scenario 6: Pagination ───────────────────────────────────────────────── + + Scenario: Paginated transaction history retrieval + Given the wallet has 25 deposits of 10.00 each + When the transaction history is queried with page=1 and limit=10 + Then the response should contain 10 transactions + And the count should be 25 + And all returned transactions should be of type "credit" + + # ─── Scenario 7: Invalid deposit amount ───────────────────────────────────── + + Scenario: Deposit rejected when amount is zero + When a deposit of 0 is attempted + Then the deposit should be rejected with status 400 + + Scenario: Deposit rejected when amount is negative + When a deposit of -50.00 is attempted + Then the deposit should be rejected with status 400 + + # ─── Scenario 8: Wallet not found ─────────────────────────────────────────── + + Scenario: Deposit fails when wallet does not exist + Given no wallet exists with the given ID + When a deposit of 100.00 is attempted for that wallet + Then the response should be 404 Not Found + + # ─── Scenario 9: Idempotent projection ────────────────────────────────────── + + Scenario: Projection handler handles duplicate events without creating duplicates + Given a "money-deposited" event with event_id "evt-001" has been processed by the projection + When the same event with event_id "evt-001" is sent to the projection again + Then the transaction_reads table should contain exactly 1 row for event_id "evt-001" + + # ─── Scenario 10: Optimistic concurrency ──────────────────────────────────── + + Scenario: Concurrent writes to the same wallet stream are rejected + Given a transaction event stream exists for a wallet at version 1 + When two concurrent writes to that stream at version 1 are attempted + Then the second write should fail with a version conflict error + And the stream should contain exactly 1 event diff --git a/tests/integration/event-sourcing/performance.spec.ts b/tests/integration/event-sourcing/performance.spec.ts new file mode 100644 index 00000000..ab3009ed --- /dev/null +++ b/tests/integration/event-sourcing/performance.spec.ts @@ -0,0 +1,113 @@ +/** + * T047b — Performance test: load 1,000 events within 500ms p95 (SC-002) + * + * Uses InMemoryEventStore so no external DB is needed. + */ +import 'reflect-metadata'; +import { AggregateRoot } from '@libs/ddd/aggregate-root.base'; +import { DomainEvent, DomainEventProps } from '@libs/ddd/domain-event.base'; +import { Aggregate } from '@libs/event-sourcing/decorators/aggregate.decorator'; +import { Event } from '@libs/event-sourcing/decorators/event.decorator'; +import { EventHandler } from '@libs/event-sourcing/decorators/event-handler.decorator'; +import { InMemoryEventStore } from '@infrastructure/event-sourcing/drivers/in-memory/in-memory.event-store'; +import { InMemorySnapshotStore } from '@infrastructure/event-sourcing/drivers/in-memory/in-memory.snapshot-store'; +import { EventStoreRepositoryBase } from '@libs/event-sourcing/repository/event-store-repository.base'; +import { EventBus } from '@libs/event-sourcing/services/event-bus'; +import { EventMap } from '@libs/event-sourcing/services/event-map'; +import { EventStream } from '@libs/event-sourcing/models/event-stream'; +import { LoggerPort } from '@libs/ports/logger.port'; + +@Event('perf-event') +class PerfEvent extends DomainEvent { + constructor(props: DomainEventProps) { + super(props); + } +} + +@Aggregate('perf-aggregate') +class PerfAggregate extends AggregateRoot { + protected _id = ''; + validate() {} + eventCount = 0; + + @EventHandler(PerfEvent) + onPerfEvent(event: PerfEvent): void { + this._id = event.aggregateId; + this.eventCount += 1; + } +} + +class PerfRepository extends EventStoreRepositoryBase { + constructor( + eventStore: InMemoryEventStore, + snapshotStore: InMemorySnapshotStore, + eventBus: EventBus, + logger: LoggerPort, + eventMap: EventMap, + ) { + super(eventStore, snapshotStore, PerfAggregate, eventBus, logger, eventMap); + } +} + +function makeLogger(): LoggerPort { + return { + log: jest.fn(), + error: jest.fn(), + warn: jest.fn(), + debug: jest.fn(), + }; +} + +describe('SC-002 — Performance: load 1,000 events ≤ 500ms p95', () => { + it('replays 1,000 events and p95 latency is within 500ms', async () => { + const eventMap = new EventMap(); + eventMap.register(PerfEvent); + + const eventStore = new InMemoryEventStore(eventMap); + const snapshotStore = new InMemorySnapshotStore(); + const eventBus = new EventBus(); + const logger = makeLogger(); + + await eventStore.connect(); + await snapshotStore.connect(); + + const repo = new PerfRepository( + eventStore, + snapshotStore, + eventBus, + logger, + eventMap, + ); + + // Seed 1,000 events directly into the store + const aggId = 'perf-agg-1'; + const stream = EventStream.for('perf-aggregate', aggId); + const meta = { correlationId: 'perf', timestamp: Date.now() }; + const events: DomainEvent[] = Array.from( + { length: 1000 }, + () => new PerfEvent({ aggregateId: aggId, metadata: meta }), + ); + await eventStore.appendEvents(stream, 0, events); + + // Measure 20 sequential findById calls + const durations: number[] = []; + for (let i = 0; i < 20; i++) { + const start = performance.now(); + const agg = await repo.findById(aggId); + durations.push(performance.now() - start); + expect(agg).not.toBeNull(); + expect(agg!.eventCount).toBe(1000); + } + + // p95 = 19th value (index 18) of sorted durations + durations.sort((a, b) => a - b); + const p95 = durations[Math.ceil(durations.length * 0.95) - 1]; + + console.log( + `findById(1000 events) — p95: ${p95.toFixed( + 2, + )}ms, max: ${durations[19].toFixed(2)}ms`, + ); + expect(p95).toBeLessThan(500); + }, 30_000); +}); diff --git a/tests/integration/event-sourcing/postgres-event-store.spec.ts b/tests/integration/event-sourcing/postgres-event-store.spec.ts new file mode 100644 index 00000000..ccf242e3 --- /dev/null +++ b/tests/integration/event-sourcing/postgres-event-store.spec.ts @@ -0,0 +1,173 @@ +/** + * T034 / T040 — PostgresEventStore integration test + * + * Requires a real PostgreSQL instance. Set EVENT_STORE_DATABASE_URL in the environment + * before running this suite: + * + * EVENT_STORE_DATABASE_URL=postgres://user:password@localhost:5432/ddh \ + * npx jest --config jest.integration.config.js --testPathPattern=postgres-event-store + * + * These tests are skipped automatically when EVENT_STORE_DATABASE_URL is not set. + */ +import 'reflect-metadata'; +import { createPool, sql } from 'slonik'; +import { PostgresEventStore } from '@infrastructure/event-sourcing/drivers/postgres/postgres.event-store'; +import { PostgresSnapshotStore } from '@infrastructure/event-sourcing/drivers/postgres/postgres.snapshot-store'; +import { EventMap } from '@libs/event-sourcing/services/event-map'; +import { EventStream } from '@libs/event-sourcing/models/event-stream'; +import { DomainEvent, DomainEventProps } from '@libs/ddd/domain-event.base'; +import { Event } from '@libs/event-sourcing/decorators/event.decorator'; + +const connectionUri = process.env.EVENT_STORE_DATABASE_URL; + +@Event('integration-order-placed') +class IntOrderPlacedEvent extends DomainEvent { + constructor(props: DomainEventProps) { + super(props); + } +} + +const testSuite = connectionUri ? describe : describe.skip; + +testSuite('PostgresEventStore — integration', () => { + let pool: Awaited>; + let eventStore: PostgresEventStore; + let snapshotStore: PostgresSnapshotStore; + let eventMap: EventMap; + + beforeAll(async () => { + pool = await createPool(connectionUri as string); + eventMap = new EventMap(); + eventMap.register(IntOrderPlacedEvent); + eventStore = new PostgresEventStore(pool, eventMap); + snapshotStore = new PostgresSnapshotStore(pool); + + // Apply schema using individual Slonik-compatible statements + await pool.query(sql` + CREATE TABLE IF NOT EXISTS event_store ( + id UUID NOT NULL DEFAULT gen_random_uuid(), + stream_id VARCHAR(500) NOT NULL, + aggregate_id VARCHAR(500) NOT NULL, + event VARCHAR(500) NOT NULL, + payload JSONB NOT NULL DEFAULT '{}', + version INTEGER NOT NULL, + occurred_on TIMESTAMPTZ NOT NULL DEFAULT NOW(), + correlation_id VARCHAR(500), + causation_id VARCHAR(500), + pool VARCHAR(100) NOT NULL DEFAULT 'default', + PRIMARY KEY (stream_id, version) + ) + `); + + await pool.query(sql` + CREATE INDEX IF NOT EXISTS idx_event_store_aggregate_id + ON event_store (aggregate_id) + `); + + await pool.query(sql` + CREATE INDEX IF NOT EXISTS idx_event_store_occurred_on + ON event_store (occurred_on) + `); + + await pool.query(sql` + CREATE TABLE IF NOT EXISTS snapshot_store ( + id UUID NOT NULL DEFAULT gen_random_uuid() PRIMARY KEY, + stream_id VARCHAR(500) NOT NULL, + aggregate_id VARCHAR(500) NOT NULL, + aggregate_version INTEGER NOT NULL, + payload JSONB NOT NULL DEFAULT '{}', + occurred_on TIMESTAMPTZ NOT NULL DEFAULT NOW(), + pool VARCHAR(100) NOT NULL DEFAULT 'default' + ) + `); + + await pool.query(sql` + CREATE INDEX IF NOT EXISTS idx_snapshot_store_stream_version + ON snapshot_store (stream_id, aggregate_version DESC) + `); + }); + + afterAll(async () => { + await pool.end(); + }); + + const meta = { correlationId: 'test-integration', timestamp: Date.now() }; + + it('appends events and retrieves them via getEvents', async () => { + const aggId = `int-${Date.now()}`; + const stream = EventStream.for('integration-order', aggId); + const event = new IntOrderPlacedEvent({ + aggregateId: aggId, + metadata: meta, + }); + + const envelopes = await eventStore.appendEvents(stream, 0, [event]); + expect(envelopes).toHaveLength(1); + expect(envelopes[0].metadata.version).toBe(1); + expect(envelopes[0].event).toBe('integration-order-placed'); + + const replayed: DomainEvent[] = []; + for await (const e of eventStore.getEvents(stream)) { + replayed.push(e); + } + expect(replayed).toHaveLength(1); + }); + + it('throws EventStoreVersionConflictException on concurrent write', async () => { + const aggId = `conflict-${Date.now()}`; + const stream = EventStream.for('integration-order', aggId); + const event = new IntOrderPlacedEvent({ + aggregateId: aggId, + metadata: meta, + }); + + await eventStore.appendEvents(stream, 0, [event]); + + // Second write at expectedVersion 0 conflicts — stream is now at version 1 + await expect(eventStore.appendEvents(stream, 0, [event])).rejects.toThrow(); + }); + + it('returns envelopes within a date range via getAllEnvelopes', async () => { + const aggId = `range-${Date.now()}`; + const stream = EventStream.for('integration-order', aggId); + const event = new IntOrderPlacedEvent({ + aggregateId: aggId, + metadata: meta, + }); + + const before = new Date(Date.now() - 5_000); + await eventStore.appendEvents(stream, 0, [event]); + const after = new Date(Date.now() + 5_000); + + const results = await eventStore.getAllEnvelopes({ + from: before, + to: after, + }); + const mine = results.filter((e) => e.metadata.aggregateId === aggId); + expect(mine).toHaveLength(1); + }); + + it('appends and retrieves a snapshot', async () => { + const aggId = `snap-${Date.now()}`; + const stream = EventStream.for('integration-order', aggId); + + await snapshotStore.appendSnapshot(stream, 5, { + status: 'pending', + count: 5, + }); + + const snapshot = await snapshotStore.getLastSnapshot(stream); + expect(snapshot).not.toBeNull(); + expect(snapshot?.aggregateVersion).toBe(5); + expect(snapshot?.payload).toMatchObject({ status: 'pending', count: 5 }); + }); + + it('getLastSnapshot returns null when no snapshot exists', async () => { + const stream = EventStream.for( + 'integration-order', + `no-snap-${Date.now()}`, + ); + const result = await snapshotStore.getLastSnapshot(stream); + expect(result).toBeNull(); + }); +}); diff --git a/tests/test-utils/ApiClient.ts b/tests/test-utils/ApiClient.ts index b8dc0049..c94dbe06 100644 --- a/tests/test-utils/ApiClient.ts +++ b/tests/test-utils/ApiClient.ts @@ -2,10 +2,12 @@ import { routesV1 } from '@src/configs/app.routes'; import { IdResponse } from '@src/libs/api/id.response.dto'; import { CreateUserRequestDto } from '@src/modules/user/commands/create-user/create-user.request.dto'; import { UserPaginatedResponseDto } from '@src/modules/user/dtos/user.paginated.response.dto'; +import { TransactionPaginatedResponseDto } from '@src/modules/wallet/dtos/transaction.response.dto'; import { getHttpServer } from '@tests/setup/jestSetupAfterEnv'; export class ApiClient { private url = `/${routesV1.version}/${routesV1.user.root}`; + private walletsBaseUrl = `/${routesV1.version}/${routesV1.wallet.root}`; async createUser(dto: CreateUserRequestDto): Promise { const response = await getHttpServer().post(this.url).send(dto); @@ -21,4 +23,27 @@ export class ApiClient { const response = await getHttpServer().get(this.url); return response.body; } + + async depositMoney(walletId: string, amount: number): Promise { + const url = `/${routesV1.version}/${routesV1.wallet.root}/${walletId}/deposit`; + const response = await getHttpServer().post(url).send({ amount }); + return response.body; + } + + async withdrawMoney(walletId: string, amount: number): Promise { + const url = `/${routesV1.version}/${routesV1.wallet.root}/${walletId}/withdraw`; + const response = await getHttpServer().post(url).send({ amount }); + return response.body; + } + + async getTransactions( + walletId: string, + params?: { page?: number; limit?: number }, + ): Promise { + const url = `/${routesV1.version}/${routesV1.wallet.root}/${walletId}/transactions`; + const response = await getHttpServer() + .get(url) + .query(params ?? {}); + return response.body; + } } diff --git a/tests/transaction/deposit-money/deposit-money.e2e-spec.ts b/tests/transaction/deposit-money/deposit-money.e2e-spec.ts new file mode 100644 index 00000000..5593da3d --- /dev/null +++ b/tests/transaction/deposit-money/deposit-money.e2e-spec.ts @@ -0,0 +1,98 @@ +import { defineFeature, loadFeature } from 'jest-cucumber'; +import { DatabasePool, sql } from 'slonik'; +import { TestContext } from '@tests/test-utils/TestContext'; +import { ApiClient } from '@tests/test-utils/ApiClient'; +import { ApiErrorResponse } from '@src/libs/api/api-error.response'; +import { getConnectionPool } from '../../setup/jestSetupAfterEnv'; +import { + TransactionTestContext, + givenAUserExistsWithAWallet, +} from '../transaction-shared-steps'; + +const feature = loadFeature( + 'tests/transaction/deposit-money/deposit-money.feature', +); + +defineFeature(feature, (test) => { + let pool: DatabasePool; + const apiClient = new ApiClient(); + + beforeAll(() => { + pool = getConnectionPool(); + }); + + afterEach(async () => { + await pool.query(sql`TRUNCATE "users"`); + await pool.query(sql`TRUNCATE "wallets"`); + await pool.query(sql`DELETE FROM event_store WHERE pool = 'transactions'`); + await pool.query(sql`TRUNCATE transaction_reads`); + }); + + test('Successful deposit increases the wallet balance', ({ + given, + when, + then, + }) => { + const ctx = new TestContext(); + + givenAUserExistsWithAWallet(given, ctx, apiClient); + + when('I deposit 100 into the wallet', async () => { + ctx.latestResponse = await apiClient.depositMoney( + ctx.context.walletId, + 100, + ); + }); + + then('the response contains walletId and balance of 100', () => { + const response = ctx.latestResponse as { + walletId: string; + balance: number; + }; + expect(response.walletId).toBe(ctx.context.walletId); + expect(response.balance).toBe(100); + }); + }); + + test('Invalid deposit amount is rejected', ({ given, when, then }) => { + const ctx = new TestContext(); + + givenAUserExistsWithAWallet(given, ctx, apiClient); + + when(/^I deposit (-?\d+) into the wallet$/, async (amount: string) => { + ctx.latestResponse = await apiClient.depositMoney( + ctx.context.walletId, + parseInt(amount, 10), + ); + }); + + then( + /^I receive an error "(.*)" with status code (\d+)$/, + (errorMessage: string, statusCode: string) => { + const error = ctx.latestResponse as ApiErrorResponse; + expect(error.statusCode).toBe(parseInt(statusCode, 10)); + expect(error.error).toBe(errorMessage); + }, + ); + }); + + test('Deposit to a non-existent wallet returns not found', ({ + when, + then, + }) => { + const ctx = new TestContext(); + + when(/^I deposit 100 into wallet "(.*)"$/, async (walletId: string) => { + ctx.latestResponse = await apiClient.depositMoney(walletId, 100); + }); + + then( + /^I receive an error "(.*)" with status code (\d+)$/, + (errorMessage: string, statusCode: string) => { + const error = ctx.latestResponse as ApiErrorResponse; + expect(error.statusCode).toBe(parseInt(statusCode, 10)); + expect(error.error).toBe(errorMessage); + }, + ); + }); +}); diff --git a/tests/transaction/deposit-money/deposit-money.feature b/tests/transaction/deposit-money/deposit-money.feature new file mode 100644 index 00000000..2106b0e2 --- /dev/null +++ b/tests/transaction/deposit-money/deposit-money.feature @@ -0,0 +1,20 @@ +Feature: Deposit money into a wallet + + Scenario: Successful deposit increases the wallet balance + Given a user exists with a wallet + When I deposit 100 into the wallet + Then the response contains walletId and balance of 100 + + Scenario Outline: Invalid deposit amount is rejected + Given a user exists with a wallet + When I deposit into the wallet + Then I receive an error "Bad Request" with status code 400 + + Examples: + | amount | + | 0 | + | -50 | + + Scenario: Deposit to a non-existent wallet returns not found + When I deposit 100 into wallet "00000000-0000-0000-0000-000000000000" + Then I receive an error "Not Found" with status code 404 diff --git a/tests/transaction/get-transactions/get-transactions.e2e-spec.ts b/tests/transaction/get-transactions/get-transactions.e2e-spec.ts new file mode 100644 index 00000000..9264457a --- /dev/null +++ b/tests/transaction/get-transactions/get-transactions.e2e-spec.ts @@ -0,0 +1,153 @@ +import { defineFeature, loadFeature } from 'jest-cucumber'; +import { DatabasePool, sql } from 'slonik'; +import { TestContext } from '@tests/test-utils/TestContext'; +import { ApiClient } from '@tests/test-utils/ApiClient'; +import { TransactionPaginatedResponseDto } from '@src/modules/wallet/dtos/transaction.response.dto'; +import { getConnectionPool } from '../../setup/jestSetupAfterEnv'; +import { + TransactionTestContext, + givenAUserExistsWithAWallet, + waitForProjection, +} from '../transaction-shared-steps'; + +const feature = loadFeature( + 'tests/transaction/get-transactions/get-transactions.feature', +); + +defineFeature(feature, (test) => { + let pool: DatabasePool; + const apiClient = new ApiClient(); + + beforeAll(() => { + pool = getConnectionPool(); + }); + + afterEach(async () => { + await pool.query(sql`TRUNCATE "users"`); + await pool.query(sql`TRUNCATE "wallets"`); + await pool.query(sql`DELETE FROM event_store WHERE pool = 'transactions'`); + await pool.query(sql`TRUNCATE transaction_reads`); + }); + + test('Wallet with no transactions returns an empty list', ({ + given, + when, + then, + }) => { + const ctx = new TestContext(); + + givenAUserExistsWithAWallet(given, ctx, apiClient); + + when('I get the transaction history for the wallet', async () => { + ctx.latestResponse = await apiClient.getTransactions( + ctx.context.walletId, + ); + }); + + then('the response contains 0 transactions', () => { + const response = ctx.latestResponse as TransactionPaginatedResponseDto; + expect(response.transactions).toHaveLength(0); + expect(response.count).toBe(0); + }); + }); + + test('Transaction history is returned newest first', ({ + given, + when, + then, + and, + }) => { + const ctx = new TestContext(); + + givenAUserExistsWithAWallet(given, ctx, apiClient); + + and('the wallet has a deposit of 100', async () => { + await apiClient.depositMoney(ctx.context.walletId, 100); + }); + + and('the wallet has a withdrawal of 30', async () => { + await apiClient.withdrawMoney(ctx.context.walletId, 30); + // Wait for the projection handler to write both events to transaction_reads + await waitForProjection(); + }); + + when('I get the transaction history for the wallet', async () => { + ctx.latestResponse = await apiClient.getTransactions( + ctx.context.walletId, + ); + }); + + then('the response contains 2 transactions', () => { + const response = ctx.latestResponse as TransactionPaginatedResponseDto; + expect(response.transactions).toHaveLength(2); + expect(response.count).toBe(2); + }); + + and('the first transaction is a debit for 30', () => { + const response = ctx.latestResponse as TransactionPaginatedResponseDto; + expect(response.transactions[0].type).toBe('debit'); + expect(response.transactions[0].amount).toBe(30); + }); + + and('the second transaction is a credit for 100', () => { + const response = ctx.latestResponse as TransactionPaginatedResponseDto; + expect(response.transactions[1].type).toBe('credit'); + expect(response.transactions[1].amount).toBe(100); + }); + }); + + test('Transaction history supports pagination', ({ + given, + when, + then, + and, + }) => { + const ctx = new TestContext(); + + givenAUserExistsWithAWallet(given, ctx, apiClient); + + and('the wallet has 3 deposits of 10', async () => { + await apiClient.depositMoney(ctx.context.walletId, 10); + await apiClient.depositMoney(ctx.context.walletId, 10); + await apiClient.depositMoney(ctx.context.walletId, 10); + await waitForProjection(); + }); + + when('I get page 1 with limit 2 of the transaction history', async () => { + ctx.latestResponse = await apiClient.getTransactions( + ctx.context.walletId, + { page: 1, limit: 2 }, + ); + }); + + then('the response contains 2 transactions', () => { + const response = ctx.latestResponse as TransactionPaginatedResponseDto; + expect(response.transactions).toHaveLength(2); + }); + + and('the total count is 3', () => { + const response = ctx.latestResponse as TransactionPaginatedResponseDto; + expect(response.count).toBe(3); + }); + }); + + test('Querying transactions for an unknown wallet returns an empty list', ({ + when, + then, + }) => { + const ctx = new TestContext(); + + when( + /^I get the transaction history for wallet "(.*)"$/, + async (walletId: string) => { + ctx.latestResponse = await apiClient.getTransactions(walletId); + }, + ); + + then('the response contains 0 transactions', () => { + const response = ctx.latestResponse as TransactionPaginatedResponseDto; + expect(response.transactions).toHaveLength(0); + expect(response.count).toBe(0); + }); + }); +}); diff --git a/tests/transaction/get-transactions/get-transactions.feature b/tests/transaction/get-transactions/get-transactions.feature new file mode 100644 index 00000000..c616bffc --- /dev/null +++ b/tests/transaction/get-transactions/get-transactions.feature @@ -0,0 +1,26 @@ +Feature: Get wallet transaction history + + Scenario: Wallet with no transactions returns an empty list + Given a user exists with a wallet + When I get the transaction history for the wallet + Then the response contains 0 transactions + + Scenario: Transaction history is returned newest first + Given a user exists with a wallet + And the wallet has a deposit of 100 + And the wallet has a withdrawal of 30 + When I get the transaction history for the wallet + Then the response contains 2 transactions + And the first transaction is a debit for 30 + And the second transaction is a credit for 100 + + Scenario: Transaction history supports pagination + Given a user exists with a wallet + And the wallet has 3 deposits of 10 + When I get page 1 with limit 2 of the transaction history + Then the response contains 2 transactions + And the total count is 3 + + Scenario: Querying transactions for an unknown wallet returns an empty list + When I get the transaction history for wallet "00000000-0000-0000-0000-000000000000" + Then the response contains 0 transactions diff --git a/tests/transaction/transaction-shared-steps.ts b/tests/transaction/transaction-shared-steps.ts new file mode 100644 index 00000000..b903469b --- /dev/null +++ b/tests/transaction/transaction-shared-steps.ts @@ -0,0 +1,87 @@ +import { DefineStepFunction } from 'jest-cucumber'; +import { sql } from 'slonik'; +import { TestContext } from '@tests/test-utils/TestContext'; +import { ApiClient } from '@tests/test-utils/ApiClient'; +import { IdResponse } from '@src/libs/api/id.response.dto'; +import { getConnectionPool } from '../setup/jestSetupAfterEnv'; + +export type TransactionTestContext = { + walletId: string; + userId: string; +}; + +/** + * Creates a user (which auto-creates a wallet with balance 0) + * and stores the walletId in the test context. + * Uses getConnectionPool() lazily so it runs after beforeAll. + */ +export const givenAUserExistsWithAWallet = ( + given: DefineStepFunction, + ctx: TestContext, + apiClient: ApiClient, +): void => { + given('a user exists with a wallet', async () => { + const pool = getConnectionPool(); + + const createResponse = (await apiClient.createUser({ + email: `txtest-${Date.now()}@example.com`, + country: 'England', + street: 'Test Street', + postalCode: '12345', + })) as IdResponse; + + ctx.context.userId = createResponse.id; + + // Wait for WalletProjectionHandler to INSERT the wallet row (async EventBus) + await waitForProjection(); + + const result = await pool.query( + sql`SELECT id FROM wallets WHERE "userId" = ${createResponse.id} LIMIT 1`, + ); + ctx.context.walletId = (result.rows[0] as any).id; + }); +}; + +/** + * Creates a user, then deposits an initial balance into the wallet. + * Balance amount is extracted from the Gherkin step text. + */ +export const givenAUserExistsWithAWalletAndBalance = ( + given: DefineStepFunction, + ctx: TestContext, + apiClient: ApiClient, +): void => { + given( + /^a user exists with a wallet and balance (\d+)$/, + async (balance: string) => { + const pool = getConnectionPool(); + + const createResponse = (await apiClient.createUser({ + email: `txtest-${Date.now()}@example.com`, + country: 'England', + street: 'Test Street', + postalCode: '12345', + })) as IdResponse; + + ctx.context.userId = createResponse.id; + + // Wait for WalletProjectionHandler to INSERT the wallet row (async EventBus) + await waitForProjection(); + + const result = await pool.query( + sql`SELECT id FROM wallets WHERE "userId" = ${createResponse.id} LIMIT 1`, + ); + ctx.context.walletId = (result.rows[0] as any).id; + + await apiClient.depositMoney(ctx.context.walletId, parseInt(balance, 10)); + }, + ); +}; + +/** + * Waits for the in-process projection handler to finish writing + * to transaction_reads after an async fire-and-forget event publish. + */ +export function waitForProjection(ms = 150): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} diff --git a/tests/transaction/withdraw-money/withdraw-money.e2e-spec.ts b/tests/transaction/withdraw-money/withdraw-money.e2e-spec.ts new file mode 100644 index 00000000..33b976dc --- /dev/null +++ b/tests/transaction/withdraw-money/withdraw-money.e2e-spec.ts @@ -0,0 +1,124 @@ +import { defineFeature, loadFeature } from 'jest-cucumber'; +import { DatabasePool, sql } from 'slonik'; +import { TestContext } from '@tests/test-utils/TestContext'; +import { ApiClient } from '@tests/test-utils/ApiClient'; +import { ApiErrorResponse } from '@src/libs/api/api-error.response'; +import { getConnectionPool } from '../../setup/jestSetupAfterEnv'; +import { + TransactionTestContext, + givenAUserExistsWithAWalletAndBalance, +} from '../transaction-shared-steps'; + +const feature = loadFeature( + 'tests/transaction/withdraw-money/withdraw-money.feature', +); + +defineFeature(feature, (test) => { + let pool: DatabasePool; + const apiClient = new ApiClient(); + + beforeAll(() => { + pool = getConnectionPool(); + }); + + afterEach(async () => { + await pool.query(sql`TRUNCATE "users"`); + await pool.query(sql`TRUNCATE "wallets"`); + await pool.query(sql`DELETE FROM event_store WHERE pool = 'transactions'`); + await pool.query(sql`TRUNCATE transaction_reads`); + }); + + test('Successful withdrawal decreases the wallet balance', ({ + given, + when, + then, + }) => { + const ctx = new TestContext(); + + givenAUserExistsWithAWalletAndBalance(given, ctx, apiClient); + + when('I withdraw 50 from the wallet', async () => { + ctx.latestResponse = await apiClient.withdrawMoney( + ctx.context.walletId, + 50, + ); + }); + + then('the response contains walletId and balance of 150', () => { + const response = ctx.latestResponse as { + walletId: string; + balance: number; + }; + expect(response.walletId).toBe(ctx.context.walletId); + expect(response.balance).toBe(150); + }); + }); + + test('Withdrawal with insufficient balance is rejected', ({ + given, + when, + then, + }) => { + const ctx = new TestContext(); + + givenAUserExistsWithAWalletAndBalance(given, ctx, apiClient); + + when('I withdraw 100 from the wallet', async () => { + ctx.latestResponse = await apiClient.withdrawMoney( + ctx.context.walletId, + 100, + ); + }); + + then( + /^I receive an error "(.*)" with status code (\d+)$/, + (errorMessage: string, statusCode: string) => { + const error = ctx.latestResponse as ApiErrorResponse; + expect(error.statusCode).toBe(parseInt(statusCode, 10)); + expect(error.error).toBe(errorMessage); + }, + ); + }); + + test('Invalid withdrawal amount is rejected', ({ given, when, then }) => { + const ctx = new TestContext(); + + givenAUserExistsWithAWalletAndBalance(given, ctx, apiClient); + + when(/^I withdraw (-?\d+) from the wallet$/, async (amount: string) => { + ctx.latestResponse = await apiClient.withdrawMoney( + ctx.context.walletId, + parseInt(amount, 10), + ); + }); + + then( + /^I receive an error "(.*)" with status code (\d+)$/, + (errorMessage: string, statusCode: string) => { + const error = ctx.latestResponse as ApiErrorResponse; + expect(error.statusCode).toBe(parseInt(statusCode, 10)); + expect(error.error).toBe(errorMessage); + }, + ); + }); + + test('Withdrawal from a non-existent wallet returns not found', ({ + when, + then, + }) => { + const ctx = new TestContext(); + + when(/^I withdraw 50 from wallet "(.*)"$/, async (walletId: string) => { + ctx.latestResponse = await apiClient.withdrawMoney(walletId, 50); + }); + + then( + /^I receive an error "(.*)" with status code (\d+)$/, + (errorMessage: string, statusCode: string) => { + const error = ctx.latestResponse as ApiErrorResponse; + expect(error.statusCode).toBe(parseInt(statusCode, 10)); + expect(error.error).toBe(errorMessage); + }, + ); + }); +}); diff --git a/tests/transaction/withdraw-money/withdraw-money.feature b/tests/transaction/withdraw-money/withdraw-money.feature new file mode 100644 index 00000000..c170989e --- /dev/null +++ b/tests/transaction/withdraw-money/withdraw-money.feature @@ -0,0 +1,25 @@ +Feature: Withdraw money from a wallet + + Scenario: Successful withdrawal decreases the wallet balance + Given a user exists with a wallet and balance 200 + When I withdraw 50 from the wallet + Then the response contains walletId and balance of 150 + + Scenario: Withdrawal with insufficient balance is rejected + Given a user exists with a wallet and balance 50 + When I withdraw 100 from the wallet + Then I receive an error "Unprocessable Entity" with status code 422 + + Scenario Outline: Invalid withdrawal amount is rejected + Given a user exists with a wallet and balance 100 + When I withdraw from the wallet + Then I receive an error "Bad Request" with status code 400 + + Examples: + | amount | + | 0 | + | -10 | + + Scenario: Withdrawal from a non-existent wallet returns not found + When I withdraw 50 from wallet "00000000-0000-0000-0000-000000000000" + Then I receive an error "Not Found" with status code 404 diff --git a/tests/unit/libs/ddd/aggregate-root.base.spec.ts b/tests/unit/libs/ddd/aggregate-root.base.spec.ts new file mode 100644 index 00000000..9ba3356c --- /dev/null +++ b/tests/unit/libs/ddd/aggregate-root.base.spec.ts @@ -0,0 +1,159 @@ +import 'reflect-metadata'; +import { AggregateRoot } from '@libs/ddd/aggregate-root.base'; +import { DomainEvent, DomainEventProps } from '@libs/ddd/domain-event.base'; +import { Aggregate } from '@libs/event-sourcing/decorators/aggregate.decorator'; +import { Event } from '@libs/event-sourcing/decorators/event.decorator'; +import { EventHandler } from '@libs/event-sourcing/decorators/event-handler.decorator'; + +// ─── Test domain events ────────────────────────────────────────────────────── + +@Event('test-happened') +class TestHappenedEvent extends DomainEvent { + readonly value: string; + + constructor(props: DomainEventProps) { + super(props); + this.value = (props as any).value; + } +} + +@Event('other-happened') +class OtherHappenedEvent extends DomainEvent { + constructor(props: DomainEventProps) { + super(props); + } +} + +// ─── Test aggregate ─────────────────────────────────────────────────────────── + +@Aggregate('test-aggregate') +class TestAggregate extends AggregateRoot { + protected _id = ''; + validate() {} + + appliedValue: string | undefined; + otherApplied = false; + + @EventHandler(TestHappenedEvent) + onTestHappened(event: TestHappenedEvent): void { + this.appliedValue = event.value; + } + + @EventHandler(OtherHappenedEvent) + onOtherHappened(_event: OtherHappenedEvent): void { + this.otherApplied = true; + } +} + +// ─── Tests ──────────────────────────────────────────────────────────────────── + +const makeAggregate = (): TestAggregate => new TestAggregate(); + +const makeEvent = (aggregateId = 'agg-1', value = 'hello'): TestHappenedEvent => + new TestHappenedEvent({ + aggregateId, + value, + metadata: { correlationId: 'test-corr', timestamp: Date.now() }, + } as any); + +describe('AggregateRoot — event-sourcing extensions', () => { + describe('version', () => { + it('starts at 0', () => { + const agg = makeAggregate(); + expect(agg.version).toBe(0); + }); + }); + + describe('applyEvent()', () => { + it('increments version by 1 per event', () => { + const agg = makeAggregate(); + agg.applyEvent(makeEvent()); + expect(agg.version).toBe(1); + agg.applyEvent(makeEvent()); + expect(agg.version).toBe(2); + }); + + it('dispatches to the correct @EventHandler method', () => { + const agg = makeAggregate(); + agg.applyEvent(makeEvent('agg-1', 'world')); + expect(agg.appliedValue).toBe('world'); + }); + + it('adds event to uncommitted events when fromHistory=false (default)', () => { + const agg = makeAggregate(); + agg.applyEvent(makeEvent()); + expect(agg.domainEvents).toHaveLength(1); + }); + + it('does NOT add event to uncommitted events when fromHistory=true', () => { + const agg = makeAggregate(); + agg.applyEvent(makeEvent(), true); + expect(agg.domainEvents).toHaveLength(0); + }); + + it('dispatches to the matching handler by constructor', () => { + const agg = makeAggregate(); + agg.applyEvent( + new OtherHappenedEvent({ + aggregateId: 'agg-1', + metadata: { correlationId: 'test-corr', timestamp: Date.now() }, + }), + ); + expect(agg.otherApplied).toBe(true); + }); + }); + + describe('commit()', () => { + it('returns all uncommitted events', () => { + const agg = makeAggregate(); + const e1 = makeEvent(); + const e2 = makeEvent(); + agg.applyEvent(e1); + agg.applyEvent(e2); + const committed = agg.commit(); + expect(committed).toHaveLength(2); + }); + + it('clears uncommitted events after commit', () => { + const agg = makeAggregate(); + agg.applyEvent(makeEvent()); + agg.commit(); + expect(agg.domainEvents).toHaveLength(0); + }); + + it('returns empty array when there are no uncommitted events', () => { + const agg = makeAggregate(); + expect(agg.commit()).toHaveLength(0); + }); + }); + + describe('loadFromHistory()', () => { + async function* makeEventStream( + events: DomainEvent[], + ): AsyncGenerator { + for (const e of events) yield e; + } + + it('replays all events incrementing version', async () => { + const agg = makeAggregate(); + const events = [makeEvent('agg-1', 'a'), makeEvent('agg-1', 'b')]; + await agg.loadFromHistory(makeEventStream(events)); + expect(agg.version).toBe(2); + }); + + it('applies events with fromHistory=true (no uncommitted events added)', async () => { + const agg = makeAggregate(); + await agg.loadFromHistory( + makeEventStream([makeEvent('agg-1', 'replay')]), + ); + expect(agg.appliedValue).toBe('replay'); + expect(agg.domainEvents).toHaveLength(0); + }); + + it('is a no-op on empty stream', async () => { + const agg = makeAggregate(); + await agg.loadFromHistory(makeEventStream([])); + expect(agg.version).toBe(0); + }); + }); +}); diff --git a/tests/unit/libs/event-sourcing/event-store-repository.base.spec.ts b/tests/unit/libs/event-sourcing/event-store-repository.base.spec.ts new file mode 100644 index 00000000..5a5ff8a3 --- /dev/null +++ b/tests/unit/libs/event-sourcing/event-store-repository.base.spec.ts @@ -0,0 +1,356 @@ +import 'reflect-metadata'; +import { AggregateRoot } from '@libs/ddd/aggregate-root.base'; +import { DomainEvent, DomainEventProps } from '@libs/ddd/domain-event.base'; +import { Aggregate } from '@libs/event-sourcing/decorators/aggregate.decorator'; +import { Event } from '@libs/event-sourcing/decorators/event.decorator'; +import { EventHandler } from '@libs/event-sourcing/decorators/event-handler.decorator'; +import { InMemoryEventStore } from '@infrastructure/event-sourcing/drivers/in-memory/in-memory.event-store'; +import { InMemorySnapshotStore } from '@infrastructure/event-sourcing/drivers/in-memory/in-memory.snapshot-store'; +import { EventStoreRepositoryBase } from '@libs/event-sourcing/repository/event-store-repository.base'; +import { EventBus } from '@libs/event-sourcing/services/event-bus'; +import { EventMap } from '@libs/event-sourcing/services/event-map'; +import { EventEnvelope } from '@libs/event-sourcing/models/event-envelope'; +import { EventStream } from '@libs/event-sourcing/models/event-stream'; +import { LoggerPort } from '@libs/ports/logger.port'; + +// ─── Test domain events ────────────────────────────────────────────────────── + +@Event('order-placed') +class OrderPlacedEvent extends DomainEvent { + readonly orderId: string; + + constructor(props: DomainEventProps) { + super(props); + this.orderId = (props as any).orderId; + } +} + +@Event('order-confirmed') +class OrderConfirmedEvent extends DomainEvent { + constructor(props: DomainEventProps) { + super(props); + } +} + +// ─── Test aggregate ─────────────────────────────────────────────────────────── + +@Aggregate('order') +class OrderAggregate extends AggregateRoot { + protected _id = ''; + validate() {} + + status: 'pending' | 'confirmed' = 'pending'; + appliedCount = 0; + + static place(orderId: string): OrderAggregate { + const agg = new OrderAggregate(); + agg.applyEvent( + new OrderPlacedEvent({ + aggregateId: orderId, + orderId, + metadata: { correlationId: 'test-corr', timestamp: Date.now() }, + } as any), + ); + return agg; + } + + @EventHandler(OrderPlacedEvent) + onOrderPlaced(event: OrderPlacedEvent): void { + this._id = event.aggregateId; + this.status = 'pending'; + this.appliedCount += 1; + } + + @EventHandler(OrderConfirmedEvent) + onOrderConfirmed(_event: OrderConfirmedEvent): void { + this.status = 'confirmed'; + this.appliedCount += 1; + } +} + +// ─── Test repository ───────────────────────────────────────────────────────── + +class OrderRepository extends EventStoreRepositoryBase { + constructor( + eventStore: InMemoryEventStore, + snapshotStore: InMemorySnapshotStore, + eventBus: EventBus, + logger: LoggerPort, + eventMap: EventMap, + ) { + super( + eventStore, + snapshotStore, + OrderAggregate, + eventBus, + logger, + eventMap, + ); + } +} + +// ─── Helpers ───────────────────────────────────────────────────────────────── + +function makeLogger(): LoggerPort { + return { + log: jest.fn(), + error: jest.fn(), + warn: jest.fn(), + debug: jest.fn(), + }; +} + +async function makeRepo(overrides?: { + eventStore?: InMemoryEventStore; + snapshotStore?: InMemorySnapshotStore; + eventBus?: EventBus; + logger?: LoggerPort; +}): Promise<{ + repo: OrderRepository; + eventStore: InMemoryEventStore; + snapshotStore: InMemorySnapshotStore; + eventBus: EventBus; + logger: LoggerPort; + eventMap: EventMap; +}> { + const eventMap = new EventMap(); + eventMap.register(OrderPlacedEvent); + eventMap.register(OrderConfirmedEvent); + + const eventStore = overrides?.eventStore ?? new InMemoryEventStore(eventMap); + const snapshotStore = overrides?.snapshotStore ?? new InMemorySnapshotStore(); + const eventBus = overrides?.eventBus ?? new EventBus(); + const logger = overrides?.logger ?? makeLogger(); + + await eventStore.connect(); + await snapshotStore.connect(); + + const repo = new OrderRepository( + eventStore, + snapshotStore, + eventBus, + logger, + eventMap, + ); + return { repo, eventStore, snapshotStore, eventBus, logger, eventMap }; +} + +// ─── US1: Persist Domain Events ─────────────────────────────────────────────── + +describe('US1 — save()', () => { + it('T020 — saves uncommitted events to the event store', async () => { + const { repo, eventStore } = await makeRepo(); + const order = OrderAggregate.place('order-1'); + await repo.save(order); + + const stream = EventStream.for('order', 'order-1'); + const stored = eventStore.getStoredEnvelopes(stream.streamId); + expect(stored).toHaveLength(1); + expect(stored[0].event).toBe('order-placed'); + expect(stored[0].metadata.version).toBe(1); + expect(stored[0].metadata.aggregateId).toBe('order-1'); + }); + + it('T021 — no-op when aggregate has no uncommitted events', async () => { + const { repo, eventStore } = await makeRepo(); + const order = new OrderAggregate(); + + await expect(repo.save(order)).resolves.toBeUndefined(); + + // No streams should exist + const stream = EventStream.for('order', ''); + expect(eventStore.getStoredEnvelopes(stream.streamId)).toHaveLength(0); + }); + + it('T025 — version conflict: throws and stream remains unchanged', async () => { + const { repo, eventStore } = await makeRepo(); + + // Save once → stream is at version 1 + const order = OrderAggregate.place('order-2'); + await repo.save(order); + + const stream = EventStream.for('order', 'order-2'); + expect(eventStore.getStoredEnvelopes(stream.streamId)).toHaveLength(1); + + // A second aggregate starting from version 0 conflicts with a stream at version 1 + const staleAggregate = OrderAggregate.place('order-2'); + // staleAggregate.version = 1, events.length = 1 → expectedVersion = 0 + // but stream is at 1 → conflict + await expect(repo.save(staleAggregate)).rejects.toThrow(); + + // Stream still has only the original 1 event — no partial write + expect(eventStore.getStoredEnvelopes(stream.streamId)).toHaveLength(1); + }); + + it('clears uncommitted events after save', async () => { + const { repo } = await makeRepo(); + const order = OrderAggregate.place('order-3'); + await repo.save(order); + expect(order.domainEvents).toHaveLength(0); + }); +}); + +// ─── US2: Reconstruct Aggregate State ──────────────────────────────────────── + +describe('US2 — findById()', () => { + it('T026 — reconstructs aggregate by replaying full event history', async () => { + const { repo } = await makeRepo(); + + const order = OrderAggregate.place('order-10'); + order.applyEvent( + new OrderConfirmedEvent({ + aggregateId: 'order-10', + metadata: { correlationId: 'test-corr', timestamp: Date.now() }, + }), + ); + await repo.save(order); + + const loaded = await repo.findById('order-10'); + expect(loaded).not.toBeNull(); + expect(loaded!.status).toBe('confirmed'); + expect(loaded!.version).toBe(2); + expect(loaded!.appliedCount).toBe(2); + }); + + it('T027 — returns null for unknown aggregate id', async () => { + const { repo } = await makeRepo(); + const result = await repo.findById('does-not-exist'); + expect(result).toBeNull(); + }); + + it('T028 — uses snapshot as baseline and replays only tail events', async () => { + const { repo, snapshotStore } = await makeRepo(); + + // Save an aggregate with 1 event so the stream exists at version 1 + const order = OrderAggregate.place('order-20'); + await repo.save(order); + + // Write a snapshot claiming version 1 (what we just saved) + const stream = EventStream.for('order', 'order-20'); + await snapshotStore.appendSnapshot(stream, 1, { + _id: 'order-20', + status: 'pending', + appliedCount: 1, + version: 1, + }); + + // Now apply and save a second event (version 2) + const meta = { correlationId: 'test-corr', timestamp: Date.now() }; + order.applyEvent( + new OrderConfirmedEvent({ aggregateId: 'order-20', metadata: meta }), + ); + await repo.save(order); + + // findById should restore from snapshot (v1) then replay 1 tail event → version 2 + const loaded = await repo.findById('order-20'); + expect(loaded).not.toBeNull(); + expect(loaded!.version).toBe(2); + }); +}); + +// ─── US3: Query Event History ───────────────────────────────────────────────── + +describe('US3 — findEnvelopes()', () => { + it('T032 — returns envelopes in date range across multiple aggregates', async () => { + const { repo, eventStore } = await makeRepo(); + + const early = new Date('2024-01-01T00:00:00Z'); + const late = new Date('2024-12-31T00:00:00Z'); + + const s1 = EventStream.for('order', 'agg-A'); + const s2 = EventStream.for('order', 'agg-B'); + + const earlyEvent = new OrderPlacedEvent({ + aggregateId: 'agg-A', + orderId: 'agg-A', + metadata: { correlationId: 'c1', timestamp: early.getTime() }, + } as any); + const lateEvent = new OrderPlacedEvent({ + aggregateId: 'agg-B', + orderId: 'agg-B', + metadata: { correlationId: 'c2', timestamp: late.getTime() }, + } as any); + + await eventStore.appendEvents(s1, 0, [earlyEvent]); + await eventStore.appendEvents(s2, 0, [lateEvent]); + + const results = await repo.findEnvelopes({ + from: new Date('2023-01-01'), + to: new Date('2024-06-01'), + }); + + expect(results).toHaveLength(1); + expect(results[0].metadata.aggregateId).toBe('agg-A'); + }); + + it('T033 — returns empty array when no events match filter', async () => { + const { repo } = await makeRepo(); + const results = await repo.findEnvelopes({ + from: new Date('2099-01-01'), + to: new Date('2099-12-31'), + }); + expect(results).toHaveLength(0); + }); +}); + +// ─── US4: Automatic Event Publishing ───────────────────────────────────────── + +describe('US4 — EventBus publishing', () => { + it('T041 — publishes envelopes to event bus after successful save', async () => { + const { repo, eventBus } = await makeRepo(); + const published: EventEnvelope[] = []; + eventBus.subscribe((e) => published.push(e)); + + const order = OrderAggregate.place('order-30'); + await repo.save(order); + + expect(published).toHaveLength(1); + expect(published[0].event).toBe('order-placed'); + }); + + it('T042 — does not publish when appendEvents throws', async () => { + const { eventBus, snapshotStore, logger, eventMap } = await makeRepo(); + + const failingStore = new InMemoryEventStore(eventMap); + await failingStore.connect(); + jest + .spyOn(failingStore, 'appendEvents') + .mockRejectedValue(new Error('store failure')); + + const repo = new OrderRepository( + failingStore, + snapshotStore, + eventBus, + logger, + eventMap, + ); + const published: EventEnvelope[] = []; + eventBus.subscribe((e) => published.push(e)); + + const order = OrderAggregate.place('order-31'); + await expect(repo.save(order)).rejects.toThrow('store failure'); + expect(published).toHaveLength(0); + }); + + it('T042b — logs error and returns successfully when EventBus.publish() throws', async () => { + const { eventBus, snapshotStore, logger, eventMap } = await makeRepo(); + + jest.spyOn(eventBus, 'publish').mockImplementation(() => { + throw new Error('bus failure'); + }); + + const eventStore = new InMemoryEventStore(eventMap); + await eventStore.connect(); + const repo = new OrderRepository( + eventStore, + snapshotStore, + eventBus, + logger, + eventMap, + ); + + const order = OrderAggregate.place('order-32'); + await expect(repo.save(order)).resolves.toBeUndefined(); + expect(logger.error).toHaveBeenCalled(); + }); +}); diff --git a/tsconfig.build.json b/tsconfig.build.json index 64f86c6b..bf827e97 100644 --- a/tsconfig.build.json +++ b/tsconfig.build.json @@ -1,4 +1,4 @@ { "extends": "./tsconfig.json", - "exclude": ["node_modules", "test", "dist", "**/*spec.ts"] + "exclude": ["node_modules", "ref", "test", "dist", "**/*spec.ts"] } diff --git a/tsconfig.json b/tsconfig.json index 6c46f2fd..49767dd2 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -19,12 +19,15 @@ "strictBindCallApply": false, "forceConsistentCasingInFileNames": false, "noFallthroughCasesInSwitch": false, + "types": ["jest", "node"], "paths": { "@src/*": ["src/*"], "@modules/*": ["src/modules/*"], "@config/*": ["src/configs/*"], "@libs/*": ["src/libs/*"], + "@infrastructure/*": ["src/infrastructure/*"], "@tests/*": ["tests/*"] } - } + }, + "exclude": ["ref", "node_modules", "dist"] }