diff --git a/.changeset/add-events-package.md b/.changeset/add-events-package.md new file mode 100644 index 0000000..a1efb46 --- /dev/null +++ b/.changeset/add-events-package.md @@ -0,0 +1,5 @@ +--- +"@polkadot-apps/events": minor +--- + +Add events package for watching blockchain smart contract events with resilient auto-resubscription and typed decoding via Ink SDK. diff --git a/packages/events/README.md b/packages/events/README.md new file mode 100644 index 0000000..acd9679 --- /dev/null +++ b/packages/events/README.md @@ -0,0 +1,222 @@ +# @polkadot-apps/events + +Blockchain event watching for Polkadot smart contracts with auto-resubscribe and typed decoding via Ink SDK. + +## Install + +```bash +pnpm add @polkadot-apps/events +``` + +This package depends on `@polkadot-apps/chain-client`, `@polkadot-apps/descriptors`, `@polkadot-apps/logger`, `polkadot-api`, and `@polkadot-api/sdk-ink`, which are installed automatically. + +## Quick start + +```typescript +import { EventClient } from "@polkadot-apps/events"; +import { dam } from "./descriptors/dam.js"; // Ink codegen output + +const client = new EventClient(); +await client.connect(); + +const sub = client.watchContractEvent(dam, CONTRACT_ADDRESS, (event, meta) => { + console.log(event.type, event.value); + console.log(`Block #${meta.block.number}`); +}); + +// Later +sub.unsubscribe(); +client.destroy(); +``` + +## EventClient + +The primary interface for watching smart contract events. Handles chain connection, address filtering, Ink ABI decoding, and resilient resubscription on transient errors. + +### Creating a client + +```typescript +import { EventClient } from "@polkadot-apps/events"; + +const client = new EventClient({ + env: "paseo", // Optional. Chain environment. Default: "paseo". +}); +``` + +### Connecting + +Call `connect()` before any watch method. Resolves the chain API via `@polkadot-apps/chain-client`. + +```typescript +await client.connect(); +``` + +Duplicate calls are ignored. If the underlying chain-client fails, an `EventConnectionError` is thrown. + +### Watching contract events + +Subscribe to typed events emitted by a deployed Ink smart contract. Events are decoded using the contract's ABI descriptors. + +```typescript +import { dam } from "./descriptors/dam.js"; + +const sub = client.watchContractEvent( + dam, // Ink contract descriptors (from codegen) + "0xABCD...1234", // Deployed contract address + (event, meta) => { + console.log(event.type); // Typed event discriminant + console.log(event.value); // Typed event payload + console.log(meta.block.number); // Block number + console.log(meta.block.hash); // Block hash + }, + { + retryDelayMs: 2000, // Optional. Delay before retry. Default: 2000. + maxRetries: 5, // Optional. Max consecutive retries (0 = unlimited). Default: 5. + onRetry: (error, attempt) => { + console.warn(`Retry ${attempt}: ${error.message}`); + }, + onFatalError: (error) => { + console.error("Gave up:", error.message); + }, + }, +); + +sub.unsubscribe(); +``` + +Internally, the client watches `Revive.ContractEmitted` events filtered by the contract address (case-insensitive), then decodes each raw event through the Ink SDK. Decode errors are logged and skipped — the subscription stays alive. + +### Watching raw contract events + +If you don't use Ink descriptors (e.g. CDM contracts) or need the raw bytes, use `watchRawContractEvent`. It filters `Revive.ContractEmitted` by address but skips Ink SDK decoding — the callback receives the raw papi payload directly. + +```typescript +import type { RawContractEvent } from "@polkadot-apps/events"; + +const sub = client.watchRawContractEvent( + "0xABCD...1234", + (event: RawContractEvent, meta) => { + console.log(event.contract.asHex()); // Contract address + console.log(event.data.asHex()); // Raw event data + console.log(event.topics); // Event topics + console.log(meta.block.number); // Block number + }, + { mode: "best" }, // Optional. Same options as watchContractEvent. +); + +sub.unsubscribe(); +``` + +### Block mode: finalized vs best + +By default, events come from **finalized** blocks — safe from reorgs but with ~12-18s of latency. For lower-latency use cases (e.g. UI updates), pass `mode: "best"` to watch events from the latest unfinalized blocks: + +```typescript +// Low-latency: events from best (unfinalized) blocks +const sub = client.watchContractEvent(dam, address, (event, meta) => { + console.log("Best block event:", event.type); +}, { mode: "best" }); + +// Default: events from finalized blocks +const sub2 = client.watchContractEvent(dam, address, (event, meta) => { + console.log("Finalized event:", event.type); +}, { mode: "finalized" }); // same as omitting mode +``` + +The `"best"` mode uses papi's unsafe API to watch `System.Events` storage at best blocks, bypassing the finalized-only limitation of papi's `.watch()`. Events from best blocks may be reverted during chain reorganizations. + +### Cleanup + +```typescript +client.destroy(); // Stops all subscriptions and resets state. Safe to call multiple times. +``` + +## Error handling + +All errors extend `EventError`. Catch the base class to handle any error from this package. + +```typescript +import { + EventError, + EventConnectionError, + EventSubscriptionError, +} from "@polkadot-apps/events"; + +try { + await client.connect(); +} catch (err) { + if (err instanceof EventConnectionError) { + console.error("Connection failed:", err.message); + } else if (err instanceof EventError) { + console.error("Events error:", err.message); + } +} +``` + +| Error class | When it is thrown | Extra properties | +|-------------|-------------------|------------------| +| `EventConnectionError` | `connect()` fails or watch called before `connect()` | -- | +| `EventSubscriptionError` | Subscription retries exhausted | `attempts: number` | + +## API + +### EventClient + +```typescript +class EventClient { + constructor(config?: EventClientConfig) + connect(): Promise + watchContractEvent( + contractDescriptors: D, + address: string, + callback: (event: D["__types"]["event"], meta: EventOccurrence["meta"]) => void, + options?: WatchOptions, + ): Unsubscribable + watchRawContractEvent( + address: string, + callback: (event: RawContractEvent, meta: EventOccurrence["meta"]) => void, + options?: WatchOptions, + ): Unsubscribable + destroy(): void +} +``` + +## Types + +```typescript +interface EventClientConfig { + env?: Environment; // Default: "paseo" +} + +type BlockMode = "finalized" | "best"; + +interface WatchOptions { + mode?: BlockMode; // Default: "finalized" + retryDelayMs?: number; // Default: 2000 + maxRetries?: number; // Default: 5 (0 = unlimited) + onRetry?: (error: Error, attempt: number) => void; + onFatalError?: (error: Error) => void; +} + +interface EventOccurrence { + payload: T; + meta: { + phase: { type: string; value?: number }; + block: { hash: string; number: number }; + }; +} + +interface RawContractEvent { + contract: { asHex: () => string }; + data: { asHex: () => string }; + topics: Array<{ asHex: () => string }>; +} + +interface Unsubscribable { + unsubscribe: () => void; +} +``` + +## License + +Apache-2.0 diff --git a/packages/events/package.json b/packages/events/package.json new file mode 100644 index 0000000..5d2f595 --- /dev/null +++ b/packages/events/package.json @@ -0,0 +1,32 @@ +{ + "name": "@polkadot-apps/events", + "description": "Blockchain event watching for Polkadot smart contracts with auto-resubscribe and typed decoding", + "version": "0.1.0", + "type": "module", + "main": "./dist/index.js", + "types": "./dist/index.d.ts", + "exports": { + ".": { + "types": "./dist/index.d.ts", + "import": "./dist/index.js" + } + }, + "files": ["dist"], + "publishConfig": { + "access": "public" + }, + "scripts": { + "build": "tsc -p tsconfig.json", + "clean": "rm -rf dist" + }, + "dependencies": { + "@polkadot-apps/chain-client": "workspace:*", + "@polkadot-apps/descriptors": "workspace:*", + "@polkadot-apps/logger": "workspace:*", + "polkadot-api": "catalog:", + "@polkadot-api/sdk-ink": "catalog:" + }, + "devDependencies": { + "typescript": "catalog:" + } +} diff --git a/packages/events/src/client.ts b/packages/events/src/client.ts new file mode 100644 index 0000000..032afc8 --- /dev/null +++ b/packages/events/src/client.ts @@ -0,0 +1,701 @@ +import { createLogger } from "@polkadot-apps/logger"; + +import { EventConnectionError } from "./errors.js"; +import { resilientSubscribe } from "./watch.js"; +import type { Environment } from "@polkadot-apps/chain-client"; +import type { + EventClientConfig, + EventOccurrence, + RawContractEvent, + Unsubscribable, + WatchOptions, +} from "./types.js"; + +const log = createLogger("events"); + +/** @internal Result of resolving a chain environment. */ +interface ResolvedChain { + api: any; + /** Unsafe API for asset hub — needed for best-block subscriptions (typed API rejects + * `watchValue("best")` when runtime metadata is incompatible with codegen'd descriptors). + * + * TODO: This could be avoided if chain-client exposed `getUnsafeApi()` on its + * typed API objects (e.g. `api.assetHub.getUnsafeApi()`), or returned raw + * PolkadotClients alongside typed APIs so callers don't need to call + * `getClient(descriptor)` themselves. */ + unsafeAssetHubApi: any; +} + +/** @internal Resolver function that returns the chain API for a given environment. */ +type ApiResolver = (env: Environment) => Promise; + +/** Default resolver uses chain-client's getChainAPI + getClient for the unsafe API. */ +async function defaultResolver(env: Environment): Promise { + const { getChainAPI, getClient } = await import("@polkadot-apps/chain-client"); + const [{ paseo_asset_hub }, { polkadot_asset_hub }, { kusama_asset_hub }] = await Promise.all([ + import("@polkadot-apps/descriptors/paseo-asset-hub"), + import("@polkadot-apps/descriptors/polkadot-asset-hub"), + import("@polkadot-apps/descriptors/kusama-asset-hub"), + ]); + const descriptorMap: Record = { + paseo: paseo_asset_hub, + polkadot: polkadot_asset_hub, + kusama: kusama_asset_hub, + }; + const api = await getChainAPI(env); + const descriptor = descriptorMap[env]; + const unsafeAssetHubApi = descriptor ? getClient(descriptor).getUnsafeApi() : null; + return { api, unsafeAssetHubApi }; +} + +/** + * High-level client for watching blockchain events. + * + * Provides resilient contract event subscriptions with automatic + * resubscription on transient errors and typed decoding via Ink SDK. + * + * @example + * ```ts + * import { EventClient } from "@polkadot-apps/events"; + * import { dam } from "./contracts.js"; + * + * const client = new EventClient(); + * await client.connect(); + * + * client.watchContractEvent(dam, contractAddress, (event) => { + * console.log(event.type, event.value); + * }); + * + * client.destroy(); + * ``` + */ +export class EventClient { + private readonly env: Environment; + private readonly resolve: ApiResolver; + + /** Resolved chain API from chain-client. */ + private api: any = null; + /** Unsafe API for asset hub best-block queries. */ + private unsafeAssetHubApi: any = null; + + /** Active subscriptions, torn down on destroy(). */ + private subscriptions = new Set(); + + private connected = false; + private destroyed = false; + private connectPromise: Promise | null = null; + + constructor(config?: EventClientConfig & { _resolve?: ApiResolver }) { + this.env = config?.env ?? "paseo"; + this.resolve = config?._resolve ?? defaultResolver; + } + + /** + * Connect to chains via chain-client. + * + * Resolves the typed chain API and Ink SDK contract factory. + * Must be called before any watch method. + * + * @throws {EventConnectionError} If chain-client is not initialized. + */ + async connect(): Promise { + if (this.connected) { + log.warn("Already connected, ignoring duplicate connect()"); + return; + } + if (this.connectPromise) { + return this.connectPromise; + } + this.connectPromise = this.doConnect().finally(() => { + this.connectPromise = null; + }); + return this.connectPromise; + } + + private async doConnect(): Promise { + try { + const { api, unsafeAssetHubApi } = await this.resolve(this.env); + // Guard against destroy() called while awaiting + if (this.destroyed) return; + this.api = api; + this.unsafeAssetHubApi = unsafeAssetHubApi; + this.connected = true; + log.info("Connected", { env: this.env }); + } catch (error) { + throw new EventConnectionError( + `Failed to connect to chain-client: ${error instanceof Error ? error.message : String(error)}`, + ); + } + } + + /** + * Watch raw `Revive.ContractEmitted` events for a contract address. + * + * Unlike {@link watchContractEvent}, this does **not** decode events via + * the Ink SDK — the callback receives the raw papi `ContractEmitted` + * payload (with `.contract`, `.data`, `.topics`). Useful when you have + * your own ABI decoding (e.g. CDM contracts) or just need the raw bytes. + * + * @param address - Hex address of the deployed contract. + * @param callback - Called for each raw ContractEmitted event. + * @param options - Retry, error handling, and block mode options. + * @returns A handle to stop watching. + * @throws {EventConnectionError} If not connected. + */ + watchRawContractEvent( + address: string, + callback: (event: RawContractEvent, meta: EventOccurrence["meta"]) => void, + options?: WatchOptions, + ): Unsubscribable { + this.assertConnected(); + + const mode = options?.mode ?? "finalized"; + const normalizedAddress = address.toLowerCase(); + + const sub = + mode === "best" + ? this.watchAtBest(normalizedAddress, callback, options) + : this.watchFinalized(normalizedAddress, callback, options); + + this.trackSubscription(sub); + return sub; + } + + /** + * Watch a smart contract's typed events. + * + * Internally watches `Revive.ContractEmitted` filtered by the contract + * address, then decodes each raw event into typed contract events using + * the Ink SDK. + * + * By default watches **finalized** blocks. Pass `{ mode: "best" }` for + * lower-latency delivery from best (unfinalized) blocks — useful for + * UI updates where reorg risk is acceptable. + * + * @param contractDescriptors - Ink contract descriptors (from codegen). + * @param address - Hex address of the deployed contract. + * @param callback - Called for each decoded contract event. + * @param options - Retry, error handling, and block mode options. + * @returns A handle to stop watching. + * @throws {EventConnectionError} If not connected. + */ + watchContractEvent( + contractDescriptors: D, + address: string, + callback: (event: D["__types"]["event"], meta: EventOccurrence["meta"]) => void, + options?: WatchOptions, + ): Unsubscribable { + this.assertConnected(); + + const mode = options?.mode ?? "finalized"; + + const contract = this.api.contracts.getContract(contractDescriptors, address); + const normalizedAddress = address.toLowerCase(); + + const decode = (rawEvent: any, meta: EventOccurrence["meta"]) => { + try { + const decoded = contract.filterEvents([rawEvent]); + for (const evt of decoded) { + callback(evt, meta); + } + } catch (decodeError) { + log.warn( + `Failed to decode contract event: ${decodeError instanceof Error ? decodeError.message : String(decodeError)}`, + ); + } + }; + + const sub = + mode === "best" + ? this.watchAtBest(normalizedAddress, decode, options) + : this.watchFinalized(normalizedAddress, decode, options); + + this.trackSubscription(sub); + return sub; + } + + /** + * Watch via papi's event descriptor `.watch()` (finalized blocks). + */ + private watchFinalized( + normalizedAddress: string, + decode: (rawEvent: any, meta: EventOccurrence["meta"]) => void, + options?: WatchOptions, + ): Unsubscribable { + return resilientSubscribe( + this.api.assetHub.event.Revive.ContractEmitted, + (occurrence) => decode(occurrence.payload, occurrence.meta), + { + ...options, + filter: (payload: any) => + payload.contract.asHex().toLowerCase() === normalizedAddress, + }, + ); + } + + /** + * Watch `System.Events` storage at best blocks, filtering for + * `Revive.ContractEmitted` from the target address. + * + * papi's `.watch()` is hardcoded to finalized blocks, adding 12-18s + * of latency. This bypasses that by using the unsafe API's + * `watchValue("best")` on storage. + */ + private watchAtBest( + normalizedAddress: string, + decode: (rawEvent: any, meta: EventOccurrence["meta"]) => void, + options?: WatchOptions, + ): Unsubscribable { + const assetHub = this.api.assetHub; + const unsafeApi = this.unsafeAssetHubApi; + const eventFilter = assetHub.event.Revive.ContractEmitted; + + if (!unsafeApi) { + log.warn("Unsafe API not available — falling back to finalized mode"); + return this.watchFinalized(normalizedAddress, decode, options); + } + + let subscription: { unsubscribe: () => void } | null = null; + let stopped = false; + let retryTimeout: ReturnType | null = null; + let consecutiveErrors = 0; + + const retryDelay = options?.retryDelayMs ?? 2000; + const maxRetries = options?.maxRetries ?? 5; + + const subscribe = () => { + if (stopped) return; + + try { + const events$ = unsafeApi.query.System.Events.watchValue("best"); + subscription = events$.subscribe({ + next: (systemEvents: any[]) => { + consecutiveErrors = 0; + const matched = eventFilter.filter(systemEvents.map((e: any) => e.event)); + for (const raw of matched) { + if (raw.contract.asHex().toLowerCase() !== normalizedAddress) continue; + decode(raw, { + phase: { type: "ApplyExtrinsic" }, + block: { hash: "", number: 0 }, + }); + } + }, + error: (error: Error) => { + consecutiveErrors++; + log.warn( + `Best-block subscription error (attempt ${consecutiveErrors}): ${error.message}`, + ); + + if (maxRetries > 0 && consecutiveErrors >= maxRetries) { + stopped = true; + options?.onFatalError?.(error); + return; + } + + options?.onRetry?.(error, consecutiveErrors); + retryTimeout = setTimeout(subscribe, retryDelay); + }, + }); + } catch (error) { + const err = error instanceof Error ? error : new Error(String(error)); + consecutiveErrors++; + log.warn( + `Best-block subscribe failed (attempt ${consecutiveErrors}): ${err.message}`, + ); + + if (maxRetries > 0 && consecutiveErrors >= maxRetries) { + stopped = true; + options?.onFatalError?.(err); + return; + } + + options?.onRetry?.(err, consecutiveErrors); + retryTimeout = setTimeout(subscribe, retryDelay); + } + }; + + subscribe(); + + return { + unsubscribe() { + stopped = true; + if (retryTimeout) clearTimeout(retryTimeout); + subscription?.unsubscribe(); + }, + }; + } + + /** + * Tear down all active subscriptions and reset state. + */ + destroy(): void { + for (const sub of this.subscriptions) { + sub.unsubscribe(); + } + this.subscriptions.clear(); + this.api = null; + this.unsafeAssetHubApi = null; + this.connected = false; + this.destroyed = true; + this.connectPromise = null; + log.info("Destroyed"); + } + + private assertConnected(): void { + if (!this.connected || !this.api) { + throw new EventConnectionError(); + } + } + + private trackSubscription(sub: Unsubscribable): Unsubscribable { + const tracked: Unsubscribable = { + unsubscribe: () => { + this.subscriptions.delete(tracked); + sub.unsubscribe(); + }, + }; + this.subscriptions.add(tracked); + return tracked; + } +} + +// ============================================================================ +// Tests +// ============================================================================ + +if (import.meta.vitest) { + const { describe, test, expect, vi, beforeEach } = import.meta.vitest; + + let mockHandlers: any = null; + let mockFilter: any = null; + let mockBestHandlers: any = null; + + const mockContract = { + filterEvents: vi.fn((): unknown[] => []), + }; + + const mockEventFilter = vi.fn((): any[] => []); + + const mockUnsafeAssetHubApi = { + query: { + System: { + Events: { + watchValue: (_mode: string) => ({ + subscribe: (handlers: any) => { + mockBestHandlers = handlers; + return { unsubscribe: vi.fn() }; + }, + }), + }, + }, + }, + }; + + const mockApi = { + assetHub: { + event: { + Revive: { + ContractEmitted: { + watch: (filter?: (value: any) => boolean) => { + mockFilter = filter; + return { + subscribe: (handlers: any) => { + mockHandlers = handlers; + return { unsubscribe: vi.fn() }; + }, + }; + }, + filter: mockEventFilter, + }, + }, + }, + }, + contracts: { + getContract: vi.fn(() => mockContract), + }, + }; + + const mockResolver = vi.fn(async () => ({ + api: mockApi, + unsafeAssetHubApi: mockUnsafeAssetHubApi, + })); + + function createClient() { + return new EventClient({ _resolve: mockResolver }); + } + + beforeEach(() => { + vi.clearAllMocks(); + mockHandlers = null; + mockFilter = null; + mockBestHandlers = null; + mockContract.filterEvents.mockReturnValue([]); + mockEventFilter.mockReturnValue([]); + }); + + describe("EventClient", () => { + test("throws EventConnectionError before connect", () => { + const client = createClient(); + expect(() => client.watchContractEvent({} as any, "0x1234", vi.fn())).toThrow( + EventConnectionError, + ); + }); + + test("connect resolves chain API", async () => { + const client = createClient(); + await client.connect(); + expect(mockResolver).toHaveBeenCalledWith("paseo"); + // Should not throw after connect + expect(() => client.watchContractEvent({} as any, "0x1234", vi.fn())).not.toThrow( + EventConnectionError, + ); + }); + + test("duplicate connect is ignored", async () => { + const client = createClient(); + await client.connect(); + await client.connect(); + expect(mockResolver).toHaveBeenCalledTimes(1); + }); + + test("connect wraps resolver errors as EventConnectionError", async () => { + const failResolver = vi.fn(async () => { + throw new Error("no chain"); + }); + const client = new EventClient({ _resolve: failResolver }); + await expect(client.connect()).rejects.toThrow(EventConnectionError); + }); + + test("watchContractEvent creates contract and subscribes", async () => { + const client = createClient(); + await client.connect(); + + const descriptors = { __types: { event: {} } } as any; + client.watchContractEvent(descriptors, "0xABCD", vi.fn()); + + expect(mockApi.contracts.getContract).toHaveBeenCalledWith(descriptors, "0xABCD"); + expect(mockHandlers).not.toBeNull(); + }); + + test("watchContractEvent filters by address (case-insensitive)", async () => { + const client = createClient(); + await client.connect(); + + client.watchContractEvent({} as any, "0xABCD", vi.fn()); + + expect(mockFilter).toBeDefined(); + expect(mockFilter({ contract: { asHex: () => "0xabcd" } })).toBe(true); + expect(mockFilter({ contract: { asHex: () => "0xABCD" } })).toBe(true); + expect(mockFilter({ contract: { asHex: () => "0x1234" } })).toBe(false); + }); + + test("watchContractEvent decodes and calls back", async () => { + const client = createClient(); + await client.connect(); + + const callback = vi.fn(); + const decodedEvent = { type: "Transfer", value: { amount: 100n } }; + mockContract.filterEvents.mockReturnValue([decodedEvent]); + + client.watchContractEvent({} as any, "0xABCD", callback); + + mockHandlers.next({ + payload: { contract: { asHex: () => "0xabcd" }, data: "0x", topics: [] }, + meta: { phase: { type: "ApplyExtrinsic" }, block: { hash: "0x", number: 1 } }, + }); + + expect(mockContract.filterEvents).toHaveBeenCalled(); + expect(callback).toHaveBeenCalledWith(decodedEvent, { + phase: { type: "ApplyExtrinsic" }, + block: { hash: "0x", number: 1 }, + }); + }); + + test("watchContractEvent survives decode errors", async () => { + const client = createClient(); + await client.connect(); + + const callback = vi.fn(); + mockContract.filterEvents.mockImplementation(() => { + throw new Error("bad ABI"); + }); + + client.watchContractEvent({} as any, "0xABCD", callback); + + // Should not throw — error is logged and swallowed + expect(() => + mockHandlers.next({ + payload: { contract: { asHex: () => "0xabcd" } }, + meta: { phase: { type: "ApplyExtrinsic" }, block: { hash: "0x", number: 1 } }, + }), + ).not.toThrow(); + expect(callback).not.toHaveBeenCalled(); + }); + + test("destroy tears down all subscriptions", async () => { + const client = createClient(); + await client.connect(); + + client.watchContractEvent({} as any, "0x1", vi.fn()); + client.watchContractEvent({} as any, "0x2", vi.fn()); + + client.destroy(); + + expect(() => client.watchContractEvent({} as any, "0x3", vi.fn())).toThrow( + EventConnectionError, + ); + }); + + test("individual unsubscribe removes from tracked set", async () => { + const client = createClient(); + await client.connect(); + + const sub = client.watchContractEvent({} as any, "0x1", vi.fn()); + sub.unsubscribe(); + + // destroy should still work without errors + client.destroy(); + }); + + test("mode best subscribes via unsafe API watchValue", async () => { + const client = createClient(); + await client.connect(); + + client.watchContractEvent({} as any, "0xABCD", vi.fn(), { mode: "best" }); + + // Should use best-block path, not finalized + expect(mockHandlers).toBeNull(); + expect(mockBestHandlers).not.toBeNull(); + }); + + test("mode best filters and decodes contract events", async () => { + const client = createClient(); + await client.connect(); + + const callback = vi.fn(); + const decodedEvent = { type: "Transfer", value: { amount: 50n } }; + const rawEvent = { contract: { asHex: () => "0xabcd" }, data: "0x", topics: [] }; + + mockEventFilter.mockReturnValue([rawEvent]); + mockContract.filterEvents.mockReturnValue([decodedEvent]); + + client.watchContractEvent({} as any, "0xABCD", callback, { mode: "best" }); + + // Simulate system events arriving at best block + mockBestHandlers.next([{ event: rawEvent }]); + + expect(mockEventFilter).toHaveBeenCalled(); + expect(mockContract.filterEvents).toHaveBeenCalledWith([rawEvent]); + expect(callback).toHaveBeenCalledWith( + decodedEvent, + expect.objectContaining({ + phase: { type: "ApplyExtrinsic" }, + }), + ); + }); + + test("mode best filters by address (case-insensitive)", async () => { + const client = createClient(); + await client.connect(); + + const callback = vi.fn(); + const matchEvent = { contract: { asHex: () => "0xabcd" } }; + const otherEvent = { contract: { asHex: () => "0x9999" } }; + + mockEventFilter.mockReturnValue([matchEvent, otherEvent]); + mockContract.filterEvents.mockReturnValue([]); + + client.watchContractEvent({} as any, "0xABCD", callback, { mode: "best" }); + + mockBestHandlers.next([{ event: matchEvent }, { event: otherEvent }]); + + // Only the matching address should be decoded + expect(mockContract.filterEvents).toHaveBeenCalledWith([matchEvent]); + expect(mockContract.filterEvents).not.toHaveBeenCalledWith([otherEvent]); + }); + + test("mode best unsubscribe stops subscription", async () => { + const client = createClient(); + await client.connect(); + + const sub = client.watchContractEvent({} as any, "0x1", vi.fn(), { mode: "best" }); + sub.unsubscribe(); + + client.destroy(); + }); + }); + + describe("watchRawContractEvent", () => { + test("subscribes and filters by address", async () => { + const client = createClient(); + await client.connect(); + + const callback = vi.fn(); + client.watchRawContractEvent("0xABCD", callback); + + expect(mockHandlers).not.toBeNull(); + expect(mockFilter).toBeDefined(); + expect(mockFilter({ contract: { asHex: () => "0xabcd" } })).toBe(true); + expect(mockFilter({ contract: { asHex: () => "0x1234" } })).toBe(false); + }); + + test("passes raw event without Ink decoding", async () => { + const client = createClient(); + await client.connect(); + + const callback = vi.fn(); + client.watchRawContractEvent("0xABCD", callback); + + const rawPayload = { + contract: { asHex: () => "0xabcd" }, + data: { asHex: () => "0xdeadbeef" }, + topics: [], + }; + mockHandlers.next({ + payload: rawPayload, + meta: { phase: { type: "ApplyExtrinsic" }, block: { hash: "0x01", number: 42 } }, + }); + + // Should NOT use Ink SDK decoding + expect(mockContract.filterEvents).not.toHaveBeenCalled(); + // Should receive the raw event and meta + expect(callback).toHaveBeenCalledWith(rawPayload, { + phase: { type: "ApplyExtrinsic" }, + block: { hash: "0x01", number: 42 }, + }); + }); + + test("throws before connect", () => { + const client = createClient(); + expect(() => client.watchRawContractEvent("0x1", vi.fn())).toThrow( + EventConnectionError, + ); + }); + + test("works with mode best", async () => { + const client = createClient(); + await client.connect(); + + const callback = vi.fn(); + const rawEvent = { + contract: { asHex: () => "0xabcd" }, + data: { asHex: () => "0xff" }, + topics: [], + }; + + mockEventFilter.mockReturnValue([rawEvent]); + + client.watchRawContractEvent("0xABCD", callback, { mode: "best" }); + + mockBestHandlers.next([{ event: rawEvent }]); + + expect(mockContract.filterEvents).not.toHaveBeenCalled(); + expect(callback).toHaveBeenCalledWith( + rawEvent, + expect.objectContaining({ + phase: { type: "ApplyExtrinsic" }, + }), + ); + }); + }); +} diff --git a/packages/events/src/errors.ts b/packages/events/src/errors.ts new file mode 100644 index 0000000..6c8ac36 --- /dev/null +++ b/packages/events/src/errors.ts @@ -0,0 +1,82 @@ +/** + * Base class for all events package errors. + * + * Use `instanceof EventError` to catch any error originating + * from the events package. + */ +export class EventError extends Error { + constructor(message: string, options?: ErrorOptions) { + super(message, options); + this.name = "EventError"; + } +} + +/** + * The client is not connected. + * + * Thrown when a watch method is called before {@link EventClient.connect}. + */ +export class EventConnectionError extends EventError { + constructor(message: string = "Not connected. Call connect() first.", options?: ErrorOptions) { + super(message, options); + this.name = "EventConnectionError"; + } +} + +/** + * A subscription failed and could not be recovered via retry. + * + * Carries the number of attempts made and the original error as `cause`. + */ +export class EventSubscriptionError extends EventError { + /** Number of retry attempts made before giving up. */ + readonly attempts: number; + + constructor(message: string, attempts: number, options?: ErrorOptions) { + super(`Subscription failed after ${attempts} attempt(s): ${message}`, options); + this.name = "EventSubscriptionError"; + this.attempts = attempts; + } +} + +// ============================================================================ +// Tests +// ============================================================================ + +if (import.meta.vitest) { + const { describe, test, expect } = import.meta.vitest; + + describe("EventError hierarchy", () => { + test("EventError is instanceof Error", () => { + const err = new EventError("boom"); + expect(err).toBeInstanceOf(Error); + expect(err).toBeInstanceOf(EventError); + expect(err.name).toBe("EventError"); + expect(err.message).toBe("boom"); + }); + + test("EventConnectionError extends EventError", () => { + const err = new EventConnectionError(); + expect(err).toBeInstanceOf(EventError); + expect(err).toBeInstanceOf(EventConnectionError); + expect(err.name).toBe("EventConnectionError"); + expect(err.message).toBe("Not connected. Call connect() first."); + }); + + test("EventConnectionError accepts custom message", () => { + const err = new EventConnectionError("custom"); + expect(err.message).toBe("custom"); + }); + + test("EventSubscriptionError extends EventError", () => { + const cause = new Error("network down"); + const err = new EventSubscriptionError("network down", 3, { cause }); + expect(err).toBeInstanceOf(EventError); + expect(err).toBeInstanceOf(EventSubscriptionError); + expect(err.name).toBe("EventSubscriptionError"); + expect(err.attempts).toBe(3); + expect(err.message).toBe("Subscription failed after 3 attempt(s): network down"); + expect(err.cause).toBe(cause); + }); + }); +} diff --git a/packages/events/src/filter.ts b/packages/events/src/filter.ts new file mode 100644 index 0000000..9520f64 --- /dev/null +++ b/packages/events/src/filter.ts @@ -0,0 +1,103 @@ +import type { EventDescriptor } from "./types.js"; + +/** + * Filter typed events from a raw events array. + * + * Thin wrapper over papi's `.filter()` that returns a typed array, + * possibly empty. + * + * @internal + */ +export function filterEvents(descriptor: EventDescriptor, events: unknown[]): T[] { + return descriptor.filter(events); +} + +/** + * Extract exactly one typed event, or throw if none found. + * + * Replaces the common pattern: + * ```ts + * const matches = api.event.X.filter(result.events); + * if (matches.length === 0) throw new Error("..."); + * const first = matches[0]; + * ``` + * + * @internal + */ +export function expectEvent(descriptor: EventDescriptor, events: unknown[]): T { + const matches = descriptor.filter(events); + if (matches.length === 0) { + throw new Error("Expected event not found in transaction result"); + } + return matches[0]; +} + +/** + * Extract one or more typed events, or throw if none found. + * + * @internal + */ +export function expectEvents(descriptor: EventDescriptor, events: unknown[]): T[] { + const matches = descriptor.filter(events); + if (matches.length === 0) { + throw new Error("Expected event not found in transaction result"); + } + return matches; +} + +// ============================================================================ +// Tests +// ============================================================================ + +if (import.meta.vitest) { + const { describe, test, expect } = import.meta.vitest; + + function mockDescriptor(results: T[]): EventDescriptor { + return { + watch: () => ({ + subscribe: () => ({ unsubscribe: () => {} }), + }), + filter: () => results, + }; + } + + describe("filterEvents", () => { + test("returns matching events", () => { + const desc = mockDescriptor(["a", "b"]); + expect(filterEvents(desc, [])).toEqual(["a", "b"]); + }); + + test("returns empty array when no matches", () => { + const desc = mockDescriptor([]); + expect(filterEvents(desc, [])).toEqual([]); + }); + }); + + describe("expectEvent", () => { + test("returns first match", () => { + const desc = mockDescriptor([{ id: 1 }, { id: 2 }]); + expect(expectEvent(desc, [])).toEqual({ id: 1 }); + }); + + test("throws when no matches", () => { + const desc = mockDescriptor([]); + expect(() => expectEvent(desc, [])).toThrow( + "Expected event not found in transaction result", + ); + }); + }); + + describe("expectEvents", () => { + test("returns all matches", () => { + const desc = mockDescriptor(["a", "b", "c"]); + expect(expectEvents(desc, [])).toEqual(["a", "b", "c"]); + }); + + test("throws when no matches", () => { + const desc = mockDescriptor([]); + expect(() => expectEvents(desc, [])).toThrow( + "Expected event not found in transaction result", + ); + }); + }); +} diff --git a/packages/events/src/index.ts b/packages/events/src/index.ts new file mode 100644 index 0000000..4240694 --- /dev/null +++ b/packages/events/src/index.ts @@ -0,0 +1,15 @@ +export { EventClient } from "./client.js"; +export { + EventError, + EventConnectionError, + EventSubscriptionError, +} from "./errors.js"; +export type { + BlockMode, + EventClientConfig, + EventDescriptor, + EventOccurrence, + RawContractEvent, + WatchOptions, + Unsubscribable, +} from "./types.js"; diff --git a/packages/events/src/types.ts b/packages/events/src/types.ts new file mode 100644 index 0000000..49bce52 --- /dev/null +++ b/packages/events/src/types.ts @@ -0,0 +1,84 @@ +import type { Environment } from "@polkadot-apps/chain-client"; + +// ============================================================================ +// Subscription handle +// ============================================================================ + +/** Handle returned by watch methods. Call `unsubscribe()` to stop receiving events. */ +export interface Unsubscribable { + unsubscribe: () => void; +} + +// ============================================================================ +// Raw contract event +// ============================================================================ + +/** Raw `Revive.ContractEmitted` payload before Ink SDK decoding. */ +export interface RawContractEvent { + contract: { asHex: () => string }; + data: { asHex: () => string }; + topics: Array<{ asHex: () => string }>; +} + +// ============================================================================ +// Event shapes +// ============================================================================ + +/** + * Structural type matching papi's `api.event.Pallet.Event`. + * + * This allows the watch/filter helpers to accept any papi event descriptor + * without importing concrete chain types. + */ +export interface EventDescriptor { + watch: (filter?: (value: T) => boolean) => { + subscribe: (handlers: { + next: (event: EventOccurrence) => void; + error: (error: Error) => void; + }) => { unsubscribe: () => void }; + }; + filter: (events: unknown[]) => T[]; +} + +/** Shape of an event emitted by papi's `.watch()` Observable. */ +export interface EventOccurrence { + payload: T; + meta: { + phase: { type: string; value?: number }; + block: { hash: string; number: number }; + }; +} + +// ============================================================================ +// Configuration +// ============================================================================ + +/** Configuration for {@link EventClient}. */ +export interface EventClientConfig { + /** Which environment to connect to. @default "paseo" */ + env?: Environment; +} + +/** + * Block selection mode for event watching. + * + * - `"finalized"` — Events from finalized blocks (default). Higher latency (~12-18s) + * but guaranteed to not be reverted. + * - `"best"` — Events from best (latest) blocks. Lower latency but may include + * events from blocks that are later reorged. + */ +export type BlockMode = "finalized" | "best"; + +/** Options controlling retry behaviour for watch methods. */ +export interface WatchOptions { + /** Block selection mode. @default "finalized" */ + mode?: BlockMode; + /** Delay in ms before resubscribing after a transient error. @default 2000 */ + retryDelayMs?: number; + /** Maximum consecutive retry attempts before giving up (0 = unlimited). @default 5 */ + maxRetries?: number; + /** Called when a transient error triggers a resubscription attempt. */ + onRetry?: (error: Error, attempt: number) => void; + /** Called when retries are exhausted and watching has stopped. */ + onFatalError?: (error: Error) => void; +} diff --git a/packages/events/src/watch.ts b/packages/events/src/watch.ts new file mode 100644 index 0000000..de0243d --- /dev/null +++ b/packages/events/src/watch.ts @@ -0,0 +1,261 @@ +import { createLogger } from "@polkadot-apps/logger"; + +import { EventSubscriptionError } from "./errors.js"; +import type { EventDescriptor, EventOccurrence, Unsubscribable, WatchOptions } from "./types.js"; + +const log = createLogger("events"); + +const DEFAULT_RETRY_DELAY_MS = 2000; +const DEFAULT_MAX_RETRIES = 5; + +/** + * Subscribe to a papi event descriptor with automatic resubscription + * on transient errors (e.g. `BlockNotPinnedError`). + * + * The consecutive error counter resets on any successful event delivery, + * so intermittent hiccups don't accumulate toward the retry limit. + * + * @internal — used by {@link EventClient}, not part of public API. + */ +export function resilientSubscribe( + descriptor: EventDescriptor, + callback: (event: EventOccurrence) => void, + options?: WatchOptions & { filter?: (payload: T) => boolean }, +): Unsubscribable { + const retryDelay = options?.retryDelayMs ?? DEFAULT_RETRY_DELAY_MS; + const maxRetries = options?.maxRetries ?? DEFAULT_MAX_RETRIES; + + let stopped = false; + let currentSub: { unsubscribe: () => void } | null = null; + let retryTimeout: ReturnType | null = null; + let consecutiveErrors = 0; + + function subscribe() { + if (stopped) return; + + const observable = options?.filter ? descriptor.watch(options.filter) : descriptor.watch(); + + currentSub = observable.subscribe({ + next: (event: EventOccurrence) => { + consecutiveErrors = 0; + callback(event); + }, + error: (error: Error) => { + consecutiveErrors++; + log.warn(`Subscription error (attempt ${consecutiveErrors}): ${error.message}`); + + if (maxRetries > 0 && consecutiveErrors >= maxRetries) { + stopped = true; + const fatal = new EventSubscriptionError(error.message, consecutiveErrors, { + cause: error, + }); + options?.onFatalError?.(fatal); + return; + } + + options?.onRetry?.(error, consecutiveErrors); + retryTimeout = setTimeout(subscribe, retryDelay); + }, + }); + } + + subscribe(); + + return { + unsubscribe() { + stopped = true; + if (retryTimeout) clearTimeout(retryTimeout); + currentSub?.unsubscribe(); + }, + }; +} + +// ============================================================================ +// Tests +// ============================================================================ + +if (import.meta.vitest) { + const { describe, test, expect, vi, beforeEach, afterEach } = import.meta.vitest; + + /** Create a mock EventDescriptor where the test controls next/error delivery. */ + function createMockDescriptor() { + let handlers: { + next: (event: EventOccurrence) => void; + error: (error: Error) => void; + } | null = null; + const unsubscribeFn = vi.fn(); + let filterFn: ((value: T) => boolean) | undefined; + + const descriptor: EventDescriptor = { + watch: (filter) => { + filterFn = filter; + return { + subscribe: (h) => { + handlers = h; + return { unsubscribe: unsubscribeFn }; + }, + }; + }, + filter: () => [], + }; + + return { + descriptor, + emit: (payload: T, block = { hash: "0x00", number: 1 }) => { + handlers?.next({ + payload, + meta: { phase: { type: "ApplyExtrinsic", value: 0 }, block }, + }); + }, + emitError: (error: Error) => handlers?.error(error), + unsubscribeFn, + getFilter: () => filterFn, + isSubscribed: () => handlers !== null, + }; + } + + beforeEach(() => { + vi.useFakeTimers(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + describe("resilientSubscribe", () => { + test("calls callback on event delivery", () => { + const mock = createMockDescriptor(); + const callback = vi.fn(); + + resilientSubscribe(mock.descriptor, callback); + mock.emit("hello"); + + expect(callback).toHaveBeenCalledTimes(1); + expect(callback.mock.calls[0][0].payload).toBe("hello"); + }); + + test("passes filter to descriptor.watch()", () => { + const mock = createMockDescriptor(); + const filter = (n: number) => n > 10; + + resilientSubscribe(mock.descriptor, vi.fn(), { filter }); + + expect(mock.getFilter()).toBe(filter); + }); + + test("resubscribes on error after delay", () => { + const mock = createMockDescriptor(); + const onRetry = vi.fn(); + + resilientSubscribe(mock.descriptor, vi.fn(), { + retryDelayMs: 1000, + onRetry, + }); + + mock.emitError(new Error("transient")); + expect(onRetry).toHaveBeenCalledWith(expect.any(Error), 1); + + // Not yet resubscribed + expect(mock.isSubscribed()).toBe(true); + + // After delay, resubscribes + vi.advanceTimersByTime(1000); + expect(mock.isSubscribed()).toBe(true); + }); + + test("calls onFatalError after maxRetries", () => { + const mock = createMockDescriptor(); + const onFatalError = vi.fn(); + + resilientSubscribe(mock.descriptor, vi.fn(), { + maxRetries: 2, + retryDelayMs: 100, + onFatalError, + }); + + // First error → retry + mock.emitError(new Error("fail1")); + vi.advanceTimersByTime(100); + + // Second error → fatal + mock.emitError(new Error("fail2")); + expect(onFatalError).toHaveBeenCalledTimes(1); + expect(onFatalError.mock.calls[0][0]).toBeInstanceOf(EventSubscriptionError); + expect(onFatalError.mock.calls[0][0].attempts).toBe(2); + }); + + test("resets consecutive error count on successful event", () => { + const mock = createMockDescriptor(); + const onFatalError = vi.fn(); + + resilientSubscribe(mock.descriptor, vi.fn(), { + maxRetries: 3, + retryDelayMs: 100, + onFatalError, + }); + + // Two errors + mock.emitError(new Error("e1")); + vi.advanceTimersByTime(100); + mock.emitError(new Error("e2")); + vi.advanceTimersByTime(100); + + // Success resets counter + mock.emit("ok"); + + // Two more errors should not trigger fatal (counter was reset) + mock.emitError(new Error("e3")); + vi.advanceTimersByTime(100); + mock.emitError(new Error("e4")); + vi.advanceTimersByTime(100); + + expect(onFatalError).not.toHaveBeenCalled(); + }); + + test("unsubscribe stops retries", () => { + const mock = createMockDescriptor(); + const onRetry = vi.fn(); + + const sub = resilientSubscribe(mock.descriptor, vi.fn(), { + retryDelayMs: 1000, + onRetry, + }); + + mock.emitError(new Error("err")); + sub.unsubscribe(); + + // Advancing time should not trigger another subscribe + vi.advanceTimersByTime(2000); + expect(onRetry).toHaveBeenCalledTimes(1); + }); + + test("unsubscribe calls underlying unsubscribe", () => { + const mock = createMockDescriptor(); + const sub = resilientSubscribe(mock.descriptor, vi.fn()); + + sub.unsubscribe(); + expect(mock.unsubscribeFn).toHaveBeenCalledTimes(1); + }); + + test("unlimited retries when maxRetries is 0", () => { + const mock = createMockDescriptor(); + const onRetry = vi.fn(); + const onFatalError = vi.fn(); + + resilientSubscribe(mock.descriptor, vi.fn(), { + maxRetries: 0, + retryDelayMs: 100, + onRetry, + onFatalError, + }); + + for (let i = 0; i < 20; i++) { + mock.emitError(new Error(`e${i}`)); + vi.advanceTimersByTime(100); + } + + expect(onRetry).toHaveBeenCalledTimes(20); + expect(onFatalError).not.toHaveBeenCalled(); + }); + }); +} diff --git a/packages/events/tsconfig.json b/packages/events/tsconfig.json new file mode 100644 index 0000000..8ffe5db --- /dev/null +++ b/packages/events/tsconfig.json @@ -0,0 +1,8 @@ +{ + "extends": "../../tsconfig.json", + "compilerOptions": { + "outDir": "./dist", + "rootDir": "./src" + }, + "include": ["src/**/*"] +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index fa1552f..8c726ed 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -296,6 +296,28 @@ importers: specifier: 'catalog:' version: 1.23.3(jiti@2.6.1)(postcss@8.5.8)(rxjs@7.8.2)(tsx@4.21.0)(yaml@2.8.3) + packages/events: + dependencies: + '@polkadot-api/sdk-ink': + specifier: 'catalog:' + version: 0.6.2(@polkadot-api/ink-contracts@0.4.6)(polkadot-api@1.23.3(jiti@2.6.1)(postcss@8.5.8)(rxjs@7.8.2)(tsx@4.21.0)(yaml@2.8.3))(rxjs@7.8.2)(typescript@5.9.3) + '@polkadot-apps/chain-client': + specifier: workspace:* + version: link:../chain-client + '@polkadot-apps/descriptors': + specifier: workspace:* + version: link:../descriptors + '@polkadot-apps/logger': + specifier: workspace:* + version: link:../logger + polkadot-api: + specifier: 'catalog:' + version: 1.23.3(jiti@2.6.1)(postcss@8.5.8)(rxjs@7.8.2)(tsx@4.21.0)(yaml@2.8.3) + devDependencies: + typescript: + specifier: 'catalog:' + version: 5.9.3 + packages/host: dependencies: polkadot-api: diff --git a/turbo.json b/turbo.json index 3739e54..7159350 100644 --- a/turbo.json +++ b/turbo.json @@ -8,7 +8,7 @@ }, "@polkadot-apps/descriptors#build": { "dependsOn": ["^build"], - "outputs": ["generated/dist/**"] + "outputs": ["chains/*/generated/dist/**"] }, "dev": { "cache": false,