diff --git a/apps/price_pusher/README.md b/apps/price_pusher/README.md index 4df9dadaf8..96c8fadc65 100644 --- a/apps/price_pusher/README.md +++ b/apps/price_pusher/README.md @@ -118,6 +118,11 @@ pnpm run start injective --grpc-endpoint https://grpc-endpoint.com \ [--pushing-frequency 10] \ [--polling-frequency 5] +# `--grpc-endpoint` accepts a fallback set. Pass the flag multiple times or +# supply a comma-separated list (`--grpc-endpoint a,b`). The pusher cycles +# round-robin through endpoints when a gRPC call fails — useful for cosmos +# nodes whose availability can be flaky. + # For Aptos pnpm run start aptos --endpoint https://fullnode.testnet.aptoslabs.com/v1 \ --pyth-contract-address 0x7e783b349d3e89cf5931af376ebeadbfab855b3fa239b7ada8f5a92fbea6b387 \ diff --git a/apps/price_pusher/package.json b/apps/price_pusher/package.json index 79abf56c0e..b878df0221 100644 --- a/apps/price_pusher/package.json +++ b/apps/price_pusher/package.json @@ -113,6 +113,10 @@ "default": "./dist/injective/command.cjs", "types": "./dist/injective/command.d.ts" }, + "./injective/endpoint-pool": { + "default": "./dist/injective/endpoint-pool.cjs", + "types": "./dist/injective/endpoint-pool.d.ts" + }, "./injective/injective": { "default": "./dist/injective/injective.cjs", "types": "./dist/injective/injective.d.ts" @@ -215,9 +219,10 @@ "dev": "ts-node src/index.ts", "prepublishOnly": "pnpm run build", "start": "node dist/index.cjs", - "test:types": "tsc" + "test:types": "tsc", + "test:unit": "test-unit" }, "type": "module", "types": "./dist/index.d.ts", - "version": "10.4.0" + "version": "10.5.0" } diff --git a/apps/price_pusher/src/injective/command.ts b/apps/price_pusher/src/injective/command.ts index ae93365cdb..bdee8f2c6e 100644 --- a/apps/price_pusher/src/injective/command.ts +++ b/apps/price_pusher/src/injective/command.ts @@ -20,10 +20,14 @@ export default { builder: { "grpc-endpoint": { description: - "gRPC endpoint URL for injective. The pusher will periodically" + + "gRPC endpoint URL(s) for injective. The pusher will periodically " + "poll for updates. The polling interval is configurable via the " + - "`polling-frequency` command-line argument.", - type: "string", + "`polling-frequency` command-line argument. " + + "Pass the flag multiple times or supply a comma-separated list " + + "(e.g. `--grpc-endpoint a,b`) to register a fallback set; the " + + "pusher will round-robin through them on gRPC errors.", + type: "array", + string: true, required: true, } as Options, network: { @@ -86,6 +90,20 @@ export default { }); const mnemonic = fs.readFileSync(mnemonicFile, "utf8").trim(); + // `grpc-endpoint` is `type: "array"` so yargs always hands us a list. Each + // entry may itself be a comma-separated string (e.g. `--grpc-endpoint a,b`), + // so split + trim + drop empties to get a clean failover set. + const grpcEndpoints: string[] = (grpcEndpoint as string[]) + .flatMap((entry) => entry.split(",")) + .map((entry) => entry.trim()) + .filter((entry) => entry.length > 0); + if (grpcEndpoints.length === 0) { + throw new Error( + "At least one --grpc-endpoint must be provided. Pass the flag once per " + + "endpoint, or supply a comma-separated list.", + ); + } + let priceItems = priceConfigs.map(({ id, alias }) => ({ id, alias })); // Better to filter out invalid price items before creating the pyth listener @@ -110,7 +128,7 @@ export default { const injectiveListener = new InjectivePriceListener( pythContractAddress, - grpcEndpoint, + grpcEndpoints, priceItems, logger.child({ module: "InjectivePriceListener" }), { @@ -120,7 +138,7 @@ export default { const injectivePusher = new InjectivePricePusher( hermesClient, pythContractAddress, - grpcEndpoint, + grpcEndpoints, logger.child({ module: "InjectivePricePusher" }), mnemonic, { diff --git a/apps/price_pusher/src/injective/endpoint-pool.ts b/apps/price_pusher/src/injective/endpoint-pool.ts new file mode 100644 index 0000000000..50f4d69d1f --- /dev/null +++ b/apps/price_pusher/src/injective/endpoint-pool.ts @@ -0,0 +1,67 @@ +import type { Logger } from "pino"; + +/** + * Round-robin cursor over a non-empty list of gRPC endpoints. Each call site + * rents the current endpoint, and the cursor advances when an attempt fails + * (see `withEndpointFailover`). The cursor is shared across every gRPC API + * helper inside a single pusher instance so that a bad endpoint affects the + * whole pusher, not just one method. + */ +export class EndpointPool { + private currentIndex = 0; + + constructor( + private readonly endpoints: readonly string[], + private readonly logger: Logger, + ) { + if (endpoints.length === 0) { + throw new Error("EndpointPool requires at least one endpoint"); + } + } + + current(): string { + // Non-null asserted: the constructor guarantees length >= 1 and `rotate` + // always wraps the index modulo `endpoints.length`. + + return this.endpoints[this.currentIndex]!; + } + + rotate(reason: unknown): string { + if (this.endpoints.length === 1) { + return this.current(); + } + const failed = this.current(); + this.currentIndex = (this.currentIndex + 1) % this.endpoints.length; + this.logger.warn( + { failedEndpoint: failed, nextEndpoint: this.current(), err: reason }, + "gRPC endpoint failed — rotating to next endpoint", + ); + return this.current(); + } + + size(): number { + return this.endpoints.length; + } +} + +/** + * Try `fn` against the current endpoint; on failure, rotate to the next + * endpoint and retry. Walks through every endpoint at most once before + * re-throwing the last error. + */ +export async function withEndpointFailover( + pool: EndpointPool, + fn: (endpoint: string) => Promise, +): Promise { + let lastError: unknown; + for (let attempt = 0; attempt < pool.size(); attempt++) { + const endpoint = pool.current(); + try { + return await fn(endpoint); + } catch (error) { + lastError = error; + pool.rotate(error); + } + } + throw lastError; +} diff --git a/apps/price_pusher/src/injective/injective.ts b/apps/price_pusher/src/injective/injective.ts index df34c1aa9f..35876e850d 100644 --- a/apps/price_pusher/src/injective/injective.ts +++ b/apps/price_pusher/src/injective/injective.ts @@ -21,6 +21,7 @@ import type { Logger } from "pino"; import type { PriceItem, PriceInfo, IPricePusher } from "../interface.js"; import { ChainPriceListener } from "../interface.js"; import type { DurationInSeconds } from "../utils.js"; +import { EndpointPool, withEndpointFailover } from "./endpoint-pool.js"; const DEFAULT_GAS_PRICE = 160_000_000; const DEFAULT_GAS_MULTIPLIER = 1.05; @@ -48,9 +49,11 @@ type InjectiveConfig = { // this use price without leading 0x export class InjectivePriceListener extends ChainPriceListener { + private readonly endpoints: EndpointPool; + constructor( private pythContractAddress: string, - private grpcEndpoint: string, + grpcEndpoints: string | readonly string[], priceItems: PriceItem[], private logger: Logger, config: { @@ -58,6 +61,10 @@ export class InjectivePriceListener extends ChainPriceListener { }, ) { super(config.pollingFrequency, priceItems); + this.endpoints = new EndpointPool( + typeof grpcEndpoints === "string" ? [grpcEndpoints] : grpcEndpoints, + logger, + ); } async getOnChainPriceInfo( @@ -65,14 +72,20 @@ export class InjectivePriceListener extends ChainPriceListener { ): Promise { let priceQueryResponse: PriceQueryResponse; try { - const api = new ChainGrpcWasmApi(this.grpcEndpoint); - const { data } = await api.fetchSmartContractState( - this.pythContractAddress, - Buffer.from(`{"price_feed":{"id":"${priceId}"}}`).toString("base64"), + priceQueryResponse = await withEndpointFailover( + this.endpoints, + async (endpoint) => { + const api = new ChainGrpcWasmApi(endpoint); + const { data } = await api.fetchSmartContractState( + this.pythContractAddress, + Buffer.from(`{"price_feed":{"id":"${priceId}"}}`).toString( + "base64", + ), + ); + const json = Buffer.from(data).toString(); + return JSON.parse(json) as PriceQueryResponse; + }, ); - - const json = Buffer.from(data).toString(); - priceQueryResponse = JSON.parse(json); } catch (error) { this.logger.error(error, `Polling on-chain price for ${priceId} failed.`); return undefined; @@ -96,11 +109,12 @@ export class InjectivePricePusher implements IPricePusher { private mnemonic: string; private chainConfig: InjectiveConfig; private accounts: Record = {}; + private readonly endpoints: EndpointPool; constructor( private hermesClient: HermesClient, private pythContractAddress: string, - private grpcEndpoint: string, + grpcEndpoints: string | readonly string[], private logger: Logger, mnemonic: string, chainConfig?: Partial, @@ -114,6 +128,10 @@ export class InjectivePricePusher implements IPricePusher { chainConfig?.priceIdsProcessChunkSize ?? DEFAULT_PRICE_IDS_PROCESS_CHUNK_SIZE, }; + this.endpoints = new EndpointPool( + typeof grpcEndpoints === "string" ? [grpcEndpoints] : grpcEndpoints, + logger, + ); } private getWallet(index: number) { @@ -131,13 +149,15 @@ export class InjectivePricePusher implements IPricePusher { msg: Msgs, index: number, ): Promise { - const chainGrpcAuthApi = new ChainGrpcAuthApi(this.grpcEndpoint); const wallet = this.getWallet(index); const injectiveAddress = wallet.toAddress().toBech32(); // Fetch the latest account details only if it's not stored. - this.accounts[injectiveAddress] ??= - await chainGrpcAuthApi.fetchAccount(injectiveAddress); + this.accounts[injectiveAddress] ??= await withEndpointFailover( + this.endpoints, + (endpoint) => + new ChainGrpcAuthApi(endpoint).fetchAccount(injectiveAddress), + ); const account = this.accounts[injectiveAddress]; @@ -158,8 +178,9 @@ export class InjectivePricePusher implements IPricePusher { txRaw.signatures = [sig]; // this takes approx 5 seconds - const txResponse = await new TxGrpcApi(this.grpcEndpoint).broadcast( - txRaw, + const txResponse = await withEndpointFailover( + this.endpoints, + (endpoint) => new TxGrpcApi(endpoint).broadcast(txRaw), ); account.baseAccount.sequence++; @@ -272,8 +293,8 @@ export class InjectivePricePusher implements IPricePusher { }); try { - const result = await new TxGrpcApi(this.grpcEndpoint).simulate( - simulateTxRaw, + const result = await withEndpointFailover(this.endpoints, (endpoint) => + new TxGrpcApi(endpoint).simulate(simulateTxRaw), ); const gas = ( @@ -327,22 +348,24 @@ export class InjectivePricePusher implements IPricePusher { */ private async getUpdateFee(vaas: string[]) { try { - const api = new ChainGrpcWasmApi(this.grpcEndpoint); - const { data } = await api.fetchSmartContractState( - this.pythContractAddress, - Buffer.from( - JSON.stringify({ - get_update_fee: { - vaas, - }, - }), - ).toString("base64"), - ); - - const json = Buffer.from(data).toString(); - - // eslint-disable-next-line @typescript-eslint/no-unsafe-return - return JSON.parse(json); + return await withEndpointFailover(this.endpoints, async (endpoint) => { + const api = new ChainGrpcWasmApi(endpoint); + const { data } = await api.fetchSmartContractState( + this.pythContractAddress, + Buffer.from( + JSON.stringify({ + get_update_fee: { + vaas, + }, + }), + ).toString("base64"), + ); + + const json = Buffer.from(data).toString(); + + // eslint-disable-next-line @typescript-eslint/no-unsafe-return + return JSON.parse(json); + }); } catch (error) { this.logger.error(error, `Error fetching update fee.`); diff --git a/apps/price_pusher/tests/endpoint-pool.test.ts b/apps/price_pusher/tests/endpoint-pool.test.ts new file mode 100644 index 0000000000..62692c82ef --- /dev/null +++ b/apps/price_pusher/tests/endpoint-pool.test.ts @@ -0,0 +1,99 @@ +import { + EndpointPool, + withEndpointFailover, +} from "../src/injective/endpoint-pool.js"; + +const silentLogger = { + trace: () => undefined, + debug: () => undefined, + info: () => undefined, + warn: () => undefined, + error: () => undefined, + fatal: () => undefined, + silent: () => undefined, + level: "silent", + child: () => silentLogger, +} as unknown as Parameters[1] extends never + ? never + : ConstructorParameters[1]; + +describe("EndpointPool", () => { + it("rejects an empty endpoint list", () => { + expect(() => new EndpointPool([], silentLogger)).toThrow( + /at least one endpoint/, + ); + }); + + it("returns the first endpoint by default", () => { + const pool = new EndpointPool(["a", "b", "c"], silentLogger); + expect(pool.current()).toBe("a"); + expect(pool.size()).toBe(3); + }); + + it("rotates round-robin and wraps past the end", () => { + const pool = new EndpointPool(["a", "b", "c"], silentLogger); + pool.rotate(new Error("first failure")); + expect(pool.current()).toBe("b"); + pool.rotate(new Error("second failure")); + expect(pool.current()).toBe("c"); + pool.rotate(new Error("wrap")); + expect(pool.current()).toBe("a"); + }); + + it("treats rotate() as a no-op for a single-endpoint pool", () => { + const pool = new EndpointPool(["only"], silentLogger); + expect(pool.current()).toBe("only"); + pool.rotate(new Error("transient")); + expect(pool.current()).toBe("only"); + }); +}); + +describe("withEndpointFailover", () => { + it("returns the first successful result without rotating", async () => { + const pool = new EndpointPool(["a", "b", "c"], silentLogger); + const seen: string[] = []; + const result = await withEndpointFailover(pool, (endpoint) => { + seen.push(endpoint); + return Promise.resolve(endpoint.toUpperCase()); + }); + expect(result).toBe("A"); + expect(seen).toEqual(["a"]); + expect(pool.current()).toBe("a"); + }); + + it("rotates to the next endpoint on failure and returns its result", async () => { + const pool = new EndpointPool(["a", "b", "c"], silentLogger); + const seen: string[] = []; + const result = await withEndpointFailover(pool, (endpoint) => { + seen.push(endpoint); + if (endpoint === "a") return Promise.reject(new Error("a down")); + return Promise.resolve(endpoint); + }); + expect(result).toBe("b"); + expect(seen).toEqual(["a", "b"]); + expect(pool.current()).toBe("b"); + }); + + it("walks every endpoint at most once before re-throwing", async () => { + const pool = new EndpointPool(["a", "b", "c"], silentLogger); + const attempts: string[] = []; + await expect( + withEndpointFailover(pool, (endpoint) => { + attempts.push(endpoint); + return Promise.reject(new Error(`${endpoint} down`)); + }), + ).rejects.toThrow(/c down/); + expect(attempts).toEqual(["a", "b", "c"]); + }); + + it("starts from wherever the pool's cursor currently is", async () => { + const pool = new EndpointPool(["a", "b", "c"], silentLogger); + pool.rotate(new Error("prior cycle")); + const attempts: string[] = []; + await withEndpointFailover(pool, (endpoint) => { + attempts.push(endpoint); + return Promise.resolve(endpoint); + }); + expect(attempts).toEqual(["b"]); + }); +});