Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions apps/price_pusher/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ pnpm run start injective --grpc-endpoint https://grpc-endpoint.com \
[--pushing-frequency 10] \
[--polling-frequency 5]

# `--grpc-endpoint` accepts a fallback set. Pass the flag multiple times or
# supply a comma-separated list (`--grpc-endpoint a,b`). The pusher cycles
# round-robin through endpoints when a gRPC call fails — useful for cosmos
# nodes whose availability can be flaky.

# For Aptos
pnpm run start aptos --endpoint https://fullnode.testnet.aptoslabs.com/v1 \
--pyth-contract-address 0x7e783b349d3e89cf5931af376ebeadbfab855b3fa239b7ada8f5a92fbea6b387 \
Expand Down
9 changes: 7 additions & 2 deletions apps/price_pusher/package.json
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@
"default": "./dist/injective/command.cjs",
"types": "./dist/injective/command.d.ts"
},
"./injective/endpoint-pool": {
"default": "./dist/injective/endpoint-pool.cjs",
"types": "./dist/injective/endpoint-pool.d.ts"
},
"./injective/injective": {
"default": "./dist/injective/injective.cjs",
"types": "./dist/injective/injective.d.ts"
Expand Down Expand Up @@ -215,9 +219,10 @@
"dev": "ts-node src/index.ts",
"prepublishOnly": "pnpm run build",
"start": "node dist/index.cjs",
"test:types": "tsc"
"test:types": "tsc",
"test:unit": "test-unit"
},
"type": "module",
"types": "./dist/index.d.ts",
"version": "10.4.0"
"version": "10.5.0"
}
28 changes: 23 additions & 5 deletions apps/price_pusher/src/injective/command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,14 @@ export default {
builder: {
"grpc-endpoint": {
description:
"gRPC endpoint URL for injective. The pusher will periodically" +
"gRPC endpoint URL(s) for injective. The pusher will periodically " +
"poll for updates. The polling interval is configurable via the " +
"`polling-frequency` command-line argument.",
type: "string",
"`polling-frequency` command-line argument. " +
"Pass the flag multiple times or supply a comma-separated list " +
"(e.g. `--grpc-endpoint a,b`) to register a fallback set; the " +
"pusher will round-robin through them on gRPC errors.",
type: "array",
string: true,
required: true,
} as Options,
network: {
Expand Down Expand Up @@ -86,6 +90,20 @@ export default {
});
const mnemonic = fs.readFileSync(mnemonicFile, "utf8").trim();

// `grpc-endpoint` is `type: "array"` so yargs always hands us a list. Each
// entry may itself be a comma-separated string (e.g. `--grpc-endpoint a,b`),
// so split + trim + drop empties to get a clean failover set.
const grpcEndpoints: string[] = (grpcEndpoint as string[])
.flatMap((entry) => entry.split(","))
.map((entry) => entry.trim())
.filter((entry) => entry.length > 0);
if (grpcEndpoints.length === 0) {
throw new Error(
"At least one --grpc-endpoint must be provided. Pass the flag once per " +
"endpoint, or supply a comma-separated list.",
);
}

let priceItems = priceConfigs.map(({ id, alias }) => ({ id, alias }));

// Better to filter out invalid price items before creating the pyth listener
Expand All @@ -110,7 +128,7 @@ export default {

const injectiveListener = new InjectivePriceListener(
pythContractAddress,
grpcEndpoint,
grpcEndpoints,
priceItems,
logger.child({ module: "InjectivePriceListener" }),
{
Expand All @@ -120,7 +138,7 @@ export default {
const injectivePusher = new InjectivePricePusher(
hermesClient,
pythContractAddress,
grpcEndpoint,
grpcEndpoints,
logger.child({ module: "InjectivePricePusher" }),
mnemonic,
{
Expand Down
67 changes: 67 additions & 0 deletions apps/price_pusher/src/injective/endpoint-pool.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import type { Logger } from "pino";

/**
* Round-robin cursor over a non-empty list of gRPC endpoints. Each call site
* rents the current endpoint, and the cursor advances when an attempt fails
* (see `withEndpointFailover`). The cursor is shared across every gRPC API
* helper inside a single pusher instance so that a bad endpoint affects the
* whole pusher, not just one method.
*/
export class EndpointPool {
private currentIndex = 0;

constructor(
private readonly endpoints: readonly string[],
private readonly logger: Logger,
) {
if (endpoints.length === 0) {
throw new Error("EndpointPool requires at least one endpoint");
}
}

current(): string {
// Non-null asserted: the constructor guarantees length >= 1 and `rotate`
// always wraps the index modulo `endpoints.length`.

return this.endpoints[this.currentIndex]!;
}

rotate(reason: unknown): string {
if (this.endpoints.length === 1) {
return this.current();
}
const failed = this.current();
this.currentIndex = (this.currentIndex + 1) % this.endpoints.length;
this.logger.warn(
{ failedEndpoint: failed, nextEndpoint: this.current(), err: reason },
"gRPC endpoint failed — rotating to next endpoint",
);
return this.current();
}

size(): number {
return this.endpoints.length;
}
}

/**
* Try `fn` against the current endpoint; on failure, rotate to the next
* endpoint and retry. Walks through every endpoint at most once before
* re-throwing the last error.
*/
export async function withEndpointFailover<T>(
pool: EndpointPool,
fn: (endpoint: string) => Promise<T>,
): Promise<T> {
let lastError: unknown;
for (let attempt = 0; attempt < pool.size(); attempt++) {
const endpoint = pool.current();
try {
return await fn(endpoint);
} catch (error) {
lastError = error;
pool.rotate(error);
}
}
throw lastError;
}
87 changes: 55 additions & 32 deletions apps/price_pusher/src/injective/injective.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import type { Logger } from "pino";
import type { PriceItem, PriceInfo, IPricePusher } from "../interface.js";
import { ChainPriceListener } from "../interface.js";
import type { DurationInSeconds } from "../utils.js";
import { EndpointPool, withEndpointFailover } from "./endpoint-pool.js";

const DEFAULT_GAS_PRICE = 160_000_000;
const DEFAULT_GAS_MULTIPLIER = 1.05;
Expand Down Expand Up @@ -48,31 +49,43 @@ type InjectiveConfig = {

// this use price without leading 0x
export class InjectivePriceListener extends ChainPriceListener {
private readonly endpoints: EndpointPool;

constructor(
private pythContractAddress: string,
private grpcEndpoint: string,
grpcEndpoints: string | readonly string[],
priceItems: PriceItem[],
private logger: Logger,
config: {
pollingFrequency: DurationInSeconds;
},
) {
super(config.pollingFrequency, priceItems);
this.endpoints = new EndpointPool(
typeof grpcEndpoints === "string" ? [grpcEndpoints] : grpcEndpoints,
logger,
);
}

async getOnChainPriceInfo(
priceId: HexString,
): Promise<PriceInfo | undefined> {
let priceQueryResponse: PriceQueryResponse;
try {
const api = new ChainGrpcWasmApi(this.grpcEndpoint);
const { data } = await api.fetchSmartContractState(
this.pythContractAddress,
Buffer.from(`{"price_feed":{"id":"${priceId}"}}`).toString("base64"),
priceQueryResponse = await withEndpointFailover(
this.endpoints,
async (endpoint) => {
const api = new ChainGrpcWasmApi(endpoint);
const { data } = await api.fetchSmartContractState(
this.pythContractAddress,
Buffer.from(`{"price_feed":{"id":"${priceId}"}}`).toString(
"base64",
),
);
const json = Buffer.from(data).toString();
return JSON.parse(json) as PriceQueryResponse;
},
);

const json = Buffer.from(data).toString();
priceQueryResponse = JSON.parse(json);
} catch (error) {
this.logger.error(error, `Polling on-chain price for ${priceId} failed.`);
return undefined;
Expand All @@ -96,11 +109,12 @@ export class InjectivePricePusher implements IPricePusher {
private mnemonic: string;
private chainConfig: InjectiveConfig;
private accounts: Record<string, Account | undefined> = {};
private readonly endpoints: EndpointPool;

constructor(
private hermesClient: HermesClient,
private pythContractAddress: string,
private grpcEndpoint: string,
grpcEndpoints: string | readonly string[],
private logger: Logger,
mnemonic: string,
chainConfig?: Partial<InjectiveConfig>,
Expand All @@ -114,6 +128,10 @@ export class InjectivePricePusher implements IPricePusher {
chainConfig?.priceIdsProcessChunkSize ??
DEFAULT_PRICE_IDS_PROCESS_CHUNK_SIZE,
};
this.endpoints = new EndpointPool(
typeof grpcEndpoints === "string" ? [grpcEndpoints] : grpcEndpoints,
logger,
);
}

private getWallet(index: number) {
Expand All @@ -131,13 +149,15 @@ export class InjectivePricePusher implements IPricePusher {
msg: Msgs,
index: number,
): Promise<TxResponse> {
const chainGrpcAuthApi = new ChainGrpcAuthApi(this.grpcEndpoint);
const wallet = this.getWallet(index);
const injectiveAddress = wallet.toAddress().toBech32();

// Fetch the latest account details only if it's not stored.
this.accounts[injectiveAddress] ??=
await chainGrpcAuthApi.fetchAccount(injectiveAddress);
this.accounts[injectiveAddress] ??= await withEndpointFailover(
this.endpoints,
(endpoint) =>
new ChainGrpcAuthApi(endpoint).fetchAccount(injectiveAddress),
);

const account = this.accounts[injectiveAddress];

Expand All @@ -158,8 +178,9 @@ export class InjectivePricePusher implements IPricePusher {
txRaw.signatures = [sig];

// this takes approx 5 seconds
const txResponse = await new TxGrpcApi(this.grpcEndpoint).broadcast(
txRaw,
const txResponse = await withEndpointFailover(
this.endpoints,
(endpoint) => new TxGrpcApi(endpoint).broadcast(txRaw),
);

account.baseAccount.sequence++;
Expand Down Expand Up @@ -272,8 +293,8 @@ export class InjectivePricePusher implements IPricePusher {
});

try {
const result = await new TxGrpcApi(this.grpcEndpoint).simulate(
simulateTxRaw,
const result = await withEndpointFailover(this.endpoints, (endpoint) =>
new TxGrpcApi(endpoint).simulate(simulateTxRaw),
);

const gas = (
Expand Down Expand Up @@ -327,22 +348,24 @@ export class InjectivePricePusher implements IPricePusher {
*/
private async getUpdateFee(vaas: string[]) {
try {
const api = new ChainGrpcWasmApi(this.grpcEndpoint);
const { data } = await api.fetchSmartContractState(
this.pythContractAddress,
Buffer.from(
JSON.stringify({
get_update_fee: {
vaas,
},
}),
).toString("base64"),
);

const json = Buffer.from(data).toString();

// eslint-disable-next-line @typescript-eslint/no-unsafe-return
return JSON.parse(json);
return await withEndpointFailover(this.endpoints, async (endpoint) => {
const api = new ChainGrpcWasmApi(endpoint);
const { data } = await api.fetchSmartContractState(
this.pythContractAddress,
Buffer.from(
JSON.stringify({
get_update_fee: {
vaas,
},
}),
).toString("base64"),
);

const json = Buffer.from(data).toString();

// eslint-disable-next-line @typescript-eslint/no-unsafe-return
return JSON.parse(json);
});
} catch (error) {
this.logger.error(error, `Error fetching update fee.`);

Expand Down
Loading