Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions .dependency-cruiser.js
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,39 @@ module.exports = {
},
},

{
name: 'no-direct-event-store-driver-import',
comment:
'Event store drivers live in src/infrastructure/event-sourcing/drivers/. ' +
'Only src/infrastructure/event-sourcing/ itself and tests/ may import from there. ' +
'Feature modules and application code must use the IEventStore / ISnapshotStore ports.',
severity: 'error',
from: {
pathNot: [
'src/infrastructure/event-sourcing/',
'src/app\\.module\\.ts$',
'tests/',
],
},
to: {
path: 'src/infrastructure/event-sourcing/drivers/',
},
},
{
name: 'no-libs-event-sourcing-imports-infrastructure',
comment:
'src/libs/event-sourcing/ is a pure domain/application library. ' +
'It must NOT import from src/infrastructure/. ' +
'Adapters are injected at runtime via NestJS DI — never imported directly.',
severity: 'error',
from: {
path: 'src/libs/event-sourcing/',
},
to: {
path: 'src/infrastructure/',
},
},

/* rules from the 'recommended' preset: */
// {
// name: 'no-circular',
Expand Down
5 changes: 5 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,8 @@ DB_PORT=5432
DB_USERNAME='user'
DB_PASSWORD='password'
DB_NAME='ddh'

# Event Store — separate connection pool for the event-sourcing layer.
# Can point to the same PostgreSQL instance as the main DB using a different schema or DB name.
# Set NODE_ENV=test to use the in-memory driver automatically (no DB needed).
EVENT_STORE_DATABASE_URL='postgres://user:password@localhost:5432/ddh'
4 changes: 2 additions & 2 deletions .env.test
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# env file for e2e testing
DB_HOST='localhost'
DB_PORT=5432
DB_USERNAME='user'
DB_PASSWORD='password'
DB_NAME='ddh_tests' # running tests in a separate test db
DB_NAME='ddh_test'
EVENT_STORE_DATABASE_URL='postgres://user:password@localhost:5432/ddh_test'
1 change: 1 addition & 0 deletions jest-e2e.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"@modules/(.*)$": "<rootDir>/src/modules/$1",
"@config/(.*)$": "<rootDir>/src/configs/$1",
"@libs/(.*)$": "<rootDir>/src/libs/$1",
"@infrastructure/(.*)$": "<rootDir>/src/infrastructure/$1",
"@exceptions$": "<rootDir>/src/libs/exceptions",
"@tests/(.*)$": "<rootDir>/tests/$1"
},
Expand Down
19 changes: 19 additions & 0 deletions jest.integration.config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/** @type {import('ts-jest').JestConfigWithTsJest} */
module.exports = {
moduleFileExtensions: ['js', 'json', 'ts'],
rootDir: '.',
testMatch: ['<rootDir>/tests/integration/**/*.spec.ts'],
transform: {
'^.+\\.(t|j)s$': ['ts-jest', { tsconfig: 'tsconfig.json' }],
},
testEnvironment: 'node',
testTimeout: 30000,
moduleNameMapper: {
'^@src/(.*)$': '<rootDir>/src/$1',
'^@libs/(.*)$': '<rootDir>/src/libs/$1',
'^@modules/(.*)$': '<rootDir>/src/modules/$1',
'^@config/(.*)$': '<rootDir>/src/configs/$1',
'^@infrastructure/(.*)$': '<rootDir>/src/infrastructure/$1',
'^@tests/(.*)$': '<rootDir>/tests/$1',
},
};
20 changes: 20 additions & 0 deletions jest.unit.config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/** @type {import('ts-jest').JestConfigWithTsJest} */
module.exports = {
moduleFileExtensions: ['js', 'json', 'ts'],
rootDir: '.',
testMatch: ['<rootDir>/tests/unit/**/*.spec.ts'],
transform: {
'^.+\\.(t|j)s$': ['ts-jest', { tsconfig: 'tsconfig.json' }],
},
collectCoverageFrom: ['src/**/*.(t|j)s'],
coverageDirectory: './coverage',
testEnvironment: 'node',
moduleNameMapper: {
'^@src/(.*)$': '<rootDir>/src/$1',
'^@libs/(.*)$': '<rootDir>/src/libs/$1',
'^@modules/(.*)$': '<rootDir>/src/modules/$1',
'^@config/(.*)$': '<rootDir>/src/configs/$1',
'^@infrastructure/(.*)$': '<rootDir>/src/infrastructure/$1',
'^@tests/(.*)$': '<rootDir>/tests/$1',
},
};
19 changes: 19 additions & 0 deletions migrations/transaction_reads.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
-- Migration: transaction_reads
-- Feature: transaction-event-sourcing-projection
-- Creates the read-model table for wallet transaction history.
-- event_store and snapshot_store tables are created by the event-sourcing infrastructure (feature 001).

CREATE TABLE IF NOT EXISTS transaction_reads (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
event_id UUID NOT NULL,
wallet_id UUID NOT NULL,
type VARCHAR(10) NOT NULL CHECK (type IN ('credit', 'debit')),
amount NUMERIC(18, 2) NOT NULL CHECK (amount > 0),
balance_after NUMERIC(18, 2) NOT NULL,
occurred_on TIMESTAMPTZ NOT NULL,
CONSTRAINT uq_transaction_reads_event_id UNIQUE (event_id)
);

-- Primary query pattern: per-wallet history newest-first
CREATE INDEX IF NOT EXISTS idx_transaction_reads_wallet_id_occurred_on
ON transaction_reads (wallet_id, occurred_on DESC);
15 changes: 15 additions & 0 deletions src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ import { ExceptionInterceptor } from '@libs/application/interceptors/exception.i
import { postgresConnectionUri } from './configs/database.config';
import { GraphQLModule } from '@nestjs/graphql';
import { ApolloDriver, ApolloDriverConfig } from '@nestjs/apollo';
import {
EventSourcingModule,
EVENT_STORE,
SNAPSHOT_STORE,
} from '@libs/event-sourcing';
import { PostgresEventStore } from '@infrastructure/event-sourcing/drivers/postgres/postgres.event-store';
import { PostgresSnapshotStore } from '@infrastructure/event-sourcing/drivers/postgres/postgres.snapshot-store';

const interceptors = [
{
Expand All @@ -36,6 +43,14 @@ const interceptors = [
autoSchemaFile: true,
}),

EventSourcingModule.forRoot({
eventStore: { provide: EVENT_STORE, useClass: PostgresEventStore },
snapshotStore: {
provide: SNAPSHOT_STORE,
useClass: PostgresSnapshotStore,
},
}),

// Modules
UserModule,
WalletModule,
Expand Down
5 changes: 5 additions & 0 deletions src/configs/app.routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,9 @@ export const routesV1 = {
root: walletsRoot,
delete: `/${walletsRoot}/:id`,
},
transaction: {
deposit: `${walletsRoot}/:walletId/deposit`,
withdraw: `${walletsRoot}/:walletId/withdraw`,
getTransactions: `${walletsRoot}/:walletId/transactions`,
},
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import { DomainEvent } from '@libs/ddd/domain-event.base';
import {
EventEnvelopeFilter,
EventStreamFilter,
IEventStore,
} from '@libs/event-sourcing/interfaces/event-store.interface';
import { EventEnvelope } from '@libs/event-sourcing/models/event-envelope';
import { EventStream } from '@libs/event-sourcing/models/event-stream';
import { EventStoreVersionConflictException } from '@libs/event-sourcing/exceptions/event-store-version-conflict.exception';
import { EventMap } from '@libs/event-sourcing/services/event-map';

export class InMemoryEventStore implements IEventStore {
/** streamId → ordered list of envelopes (version is 1-based index) */
private readonly streams = new Map<string, EventEnvelope[]>();

constructor(private readonly eventMap?: EventMap) {}

async connect(): Promise<void> {}
async disconnect(): Promise<void> {}
async ensureCollection(_pool?: string): Promise<void> {}

async appendEvents(
stream: EventStream,
expectedVersion: number,
events: DomainEvent[],
_pool?: string,
): Promise<EventEnvelope[]> {
if (events.length === 0) return [];

const stored = this.streams.get(stream.streamId) ?? [];
const currentVersion = stored.length;

if (currentVersion !== expectedVersion) {
throw new EventStoreVersionConflictException(
stream.streamId,
expectedVersion,
currentVersion,
);
}

const newEnvelopes: EventEnvelope[] = events.map((event, i) =>
EventEnvelope.fromDomainEvent(event, currentVersion + i + 1),
);

this.streams.set(stream.streamId, [...stored, ...newEnvelopes]);
return newEnvelopes;
}

async *getEvents(
stream: EventStream,
filter?: EventStreamFilter,
): AsyncGenerator<DomainEvent> {
const stored = this.streams.get(stream.streamId) ?? [];
const fromVersion = filter?.fromVersion ?? 1;

for (const envelope of stored) {
if (envelope.metadata.version >= fromVersion) {
yield this.deserializeEnvelope(envelope);
}
}
}

async getAllEnvelopes(filter: EventEnvelopeFilter): Promise<EventEnvelope[]> {
const result: EventEnvelope[] = [];

for (const envelopes of this.streams.values()) {
for (const envelope of envelopes) {
const { occurredOn } = envelope.metadata;
if (filter.from && occurredOn < filter.from) continue;
if (filter.to && occurredOn > filter.to) continue;
result.push(envelope);
}
}

return result.sort(
(a, b) =>
a.metadata.occurredOn.getTime() - b.metadata.occurredOn.getTime(),
);
}

/** Used in tests to directly inspect stored envelopes */
getStoredEnvelopes(streamId: string): EventEnvelope[] {
return this.streams.get(streamId) ?? [];
}

private deserializeEnvelope(envelope: EventEnvelope): DomainEvent {
if (this.eventMap) {
return this.eventMap.deserialize(envelope.event, envelope.payload);
}
return envelope.payload as unknown as DomainEvent;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { ISnapshotStore } from '@libs/event-sourcing/interfaces/snapshot-store.interface';
import { EventStream } from '@libs/event-sourcing/models/event-stream';
import { Snapshot } from '@libs/event-sourcing/models/snapshot';

export class InMemorySnapshotStore implements ISnapshotStore {
private readonly snapshots = new Map<string, Snapshot[]>();

async connect(): Promise<void> {}
async disconnect(): Promise<void> {}
async ensureCollection(_pool?: string): Promise<void> {}

async appendSnapshot(
stream: EventStream,
version: number,
payload: Record<string, unknown>,
_pool?: string,
): Promise<void> {
const existing = this.snapshots.get(stream.streamId) ?? [];
const snapshot = new Snapshot({
aggregateId: stream.aggregateId,
aggregateVersion: version,
payload,
});
this.snapshots.set(stream.streamId, [...existing, snapshot]);
}

async getLastSnapshot(
stream: EventStream,
_pool?: string,
): Promise<Snapshot | null> {
const stored = this.snapshots.get(stream.streamId);
if (!stored || stored.length === 0) return null;

return stored.reduce((latest, current) =>
current.aggregateVersion > latest.aggregateVersion ? current : latest,
);
}
}
Loading