From 992c9f42ad8d1b04611646ca2a9e9fbba2bb996c Mon Sep 17 00:00:00 2001 From: wayne Date: Fri, 13 Feb 2026 12:52:04 +0000 Subject: [PATCH 1/7] getChainAssetsChart V2 --- defi/l2/v2/storeToDb.ts | 108 ++++++++++++++++++++++---------- defi/src/getChainAssetsChart.ts | 7 ++- 2 files changed, 80 insertions(+), 35 deletions(-) diff --git a/defi/l2/v2/storeToDb.ts b/defi/l2/v2/storeToDb.ts index 92e0fa2360..2a11eda848 100644 --- a/defi/l2/v2/storeToDb.ts +++ b/defi/l2/v2/storeToDb.ts @@ -35,7 +35,11 @@ export async function storeHistoricalToDB(res: { timestamp: number; value: any } sql.end(); } -export async function fetchHistoricalFromDB(chain: string | undefined = undefined, isRaw: boolean = false) { +export async function fetchHistoricalFromDB( + chain: string | undefined = undefined, + isRaw: boolean = false, + breakdown: boolean = false +) { const sql = await iniDbConnection(); const timestamps = await queryPostgresWithRetry(sql`select timestamp from chainassets2`, sql); @@ -50,28 +54,68 @@ export async function fetchHistoricalFromDB(chain: string | undefined = undefine ? dailyData.map((d: any) => ({ timestamp: d.timestamp, [chain]: JSON.parse(d.value)[chain] })) : dailyData.map((d: any) => ({ timestamp: d.timestamp, ...JSON.parse(d.value) })); + data.sort((a: any, b: any) => a.timestamp - b.timestamp); if (isRaw) return data; + if (!breakdown) { + const totalsData: any[] = []; + data.map((d: any) => { + const totalsEntry: any = { timestamp: d.timestamp, data: {} }; + Object.keys(d).forEach((c: string) => { + if (!c || c == "timestamp") return; + if (chain) { + Object.keys(d[chain]).map((section) => { + totalsEntry.data[section] = d[chain][section].total; + }); + } else { + const readableChain = getChainDisplayName(c, true); + totalsEntry.data[readableChain] = {}; + Object.keys(d[c]).map((section) => { + totalsEntry.data[readableChain][section] = d[c][section].total; + }); + } + }); + + totalsData.push(totalsEntry); + }); + + return totalsData; + } + const symbolMap: { [key: string]: string } = await getR2JSONString("chainAssetsSymbolMap"); const symbolData: any[] = []; data.map((d: any) => { - const symbolEntry: any = { timestamp: d.timestamp }; - Object.keys(d).forEach((chain: string) => { - if (chain == "timestamp") return; - const readableChain = getChainDisplayName(chain, true); - symbolEntry[readableChain] = {}; - Object.keys(d[chain]).map((section) => { - symbolEntry[readableChain][section] = { total: d[chain][section].total, breakdown: {} }; - Object.keys(d[chain][section].breakdown).forEach((asset: string) => { - if (!symbolMap[asset]) { - console.log(`${asset} not found in symbolMap`); - return; - } - symbolEntry[readableChain][section].breakdown[symbolMap[asset]] = d[chain][section].breakdown[asset]; + const symbolEntry: any = { timestamp: d.timestamp, data: {} }; + Object.keys(d).forEach((c: string) => { + if (!c || c == "timestamp") return; + if (chain) { + Object.keys(d[chain]).map((section) => { + symbolEntry.data[section] = { total: d[c][section].total, breakdown: {} }; + Object.keys(d[c][section].breakdown ?? {}).forEach((asset: string) => { + if (!symbolMap[asset]) { + // console.log(`${asset} not found in symbolMap`); + return; + } + symbolEntry.data[section].breakdown[symbolMap[asset]] = d[c][section].breakdown[asset]; + }); }); - }); + } else { + const readableChain = getChainDisplayName(c, true); + symbolEntry.data[readableChain] = {}; + Object.keys(d[c]).map((section) => { + symbolEntry.data[readableChain][section] = { total: d[c][section].total, breakdown: {} }; + Object.keys(d[c][section].breakdown ?? {}).forEach((asset: string) => { + if (!symbolMap[asset]) { + // console.log(`${asset} not found in symbolMap`); + return; + } + symbolEntry.data[readableChain][section].breakdown[symbolMap[asset]] = d[c][section].breakdown[asset]; + }); + }); + } }); + symbolData.push(symbolEntry); }); @@ -79,27 +123,27 @@ export async function fetchHistoricalFromDB(chain: string | undefined = undefine } export async function fetchChartData(chain: string) { - const allHistorical = await fetchHistoricalFromDB(chain) - const chartData: any[] = [] - allHistorical.map((h: any) => { - const entry: any = { timestamp: h.timestamp } - Object.keys(h).map((chain: string) => { - if (chain == "timestamp") return; - const totalsOnly: { [key: string]: string } = {} - Object.keys(h[chain]).map((section: string) => { - totalsOnly[section] = h[chain][section].total - }) - entry[chain] = totalsOnly - }) - chartData.push(entry) - }) - return chartData + const allHistorical = await fetchHistoricalFromDB(chain); + const chartData: any[] = []; + allHistorical.map((h: any) => { + const entry: any = { timestamp: h.timestamp }; + Object.keys(h).map((chain: string) => { + if (chain == "timestamp") return; + const totalsOnly: { [key: string]: string } = {}; + Object.keys(h[chain]).map((section: string) => { + totalsOnly[section] = h[chain][section].total; + }); + entry[chain] = totalsOnly; + }); + chartData.push(entry); + }); + return chartData; } export function findDailyTimestamps(timestamps: { timestamp: number }[]): number[] { + timestamps.sort((a, b) => a.timestamp - b.timestamp); const end = getTimestampAtStartOfDay(getCurrentUnixTimestamp()); - - const start = getTimestampAtStartOfDay(timestamps[timestamps.length - 1].timestamp + secondsInADay); + const start = getTimestampAtStartOfDay(Number(timestamps[0].timestamp) + secondsInADay); const dailyTimestamps = [timestamps[timestamps.length - 1].timestamp]; for (let i = start; i < end; i += secondsInADay) { diff --git a/defi/src/getChainAssetsChart.ts b/defi/src/getChainAssetsChart.ts index 74d73c04d4..2bcc094d52 100644 --- a/defi/src/getChainAssetsChart.ts +++ b/defi/src/getChainAssetsChart.ts @@ -1,13 +1,14 @@ import { wrap, IResponse, successResponse, errorResponse } from "./utils/shared"; -import { fetchHistoricalFromDB } from "../l2/storeToDb"; +import { fetchHistoricalFromDB } from "../l2/v2/storeToDb"; import setEnvSecrets from "./utils/shared/setEnvSecrets"; +import { getChainIdFromDisplayName } from "../src/utils/normalizeChain"; const handler = async (event: any): Promise => { try { const chainParam = event.pathParameters?.chain; - const chain = chainParam.replace("%20", " "); + const chain = getChainIdFromDisplayName(chainParam.replace("%20", " ")); await setEnvSecrets(); - const chains = await fetchHistoricalFromDB(chain); + const chains = await fetchHistoricalFromDB(chain, false, false); return successResponse(chains, 10 * 60); // 10 min cache } catch (e: any) { return errorResponse({ message: e.message }); From 5293627f45e94be4fd4020dc485099cdfb8e6d0a Mon Sep 17 00:00:00 2001 From: wayne Date: Fri, 13 Feb 2026 12:54:52 +0000 Subject: [PATCH 2/7] current --- defi/src/getChainAssets.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/defi/src/getChainAssets.ts b/defi/src/getChainAssets.ts index bd8c069929..487ecf5c41 100644 --- a/defi/src/getChainAssets.ts +++ b/defi/src/getChainAssets.ts @@ -3,7 +3,7 @@ import { wrap, IResponse, successResponse, errorResponse } from "./utils/shared" const handler = async (_event: any): Promise => { try { - const chains = await getR2(`chainAssets`).then((res) => JSON.parse(res.body!)); + const chains = await getR2(`chainAssets2`).then((res) => JSON.parse(res.body!)); return successResponse(chains, 10 * 60); // 10 min cache } catch (e: any) { return errorResponse({ message: e.message }); From acca2ea41d6c698fd365ed3f16e5a60014d37188 Mon Sep 17 00:00:00 2001 From: wayne Date: Fri, 13 Feb 2026 14:21:28 +0000 Subject: [PATCH 3/7] flows on v1 --- defi/l2/v2/storeToDb.ts | 39 +-------------------------------------- 1 file changed, 1 insertion(+), 38 deletions(-) diff --git a/defi/l2/v2/storeToDb.ts b/defi/l2/v2/storeToDb.ts index 2a11eda848..569f885ec6 100644 --- a/defi/l2/v2/storeToDb.ts +++ b/defi/l2/v2/storeToDb.ts @@ -75,7 +75,7 @@ export async function fetchHistoricalFromDB( }); } }); - + totalsData.push(totalsEntry); }); @@ -155,43 +155,6 @@ export function findDailyTimestamps(timestamps: { timestamp: number }[]): number return dailyTimestamps; } -export async function fetchFlows(period: number = secondsInADay) { - const sql = await iniDbConnection(); - const targetEnd = getCurrentUnixTimestamp(); - const targetStart = targetEnd - period * 1.5; - const datas: any[] = await queryPostgresWithRetry( - sql`select * from chainassets2 where timestamp > ${targetStart}`, - sql - ); - - const actualStart = datas.reduce((prev, curr) => - Math.abs(curr.timestamp - targetStart) < Math.abs(prev.timestamp - targetStart) ? curr : prev - ); - const actualEnd = datas.reduce((prev, curr) => - Math.abs(curr.timestamp - targetEnd) < Math.abs(prev.timestamp - targetEnd) ? curr : prev - ); - - const startData = JSON.parse(datas.find((d) => d.timestamp == actualStart.timestamp)?.value ?? "{}"); - const endData = JSON.parse(datas.find((d) => d.timestamp == actualEnd.timestamp)?.value ?? "{}"); - - const flows: any = {}; - Object.keys(endData).map((chain) => { - const readableChain = getChainDisplayName(chain, true); - flows[readableChain] = {}; - Object.keys(endData[chain]).map((section) => { - flows[readableChain][section] = {}; - const startValue = startData[chain]?.[section]?.total ?? 0; - const endValue = endData[chain][section].total; - const raw = endValue - startValue; - const perc = (raw / startValue) * 100; - flows[readableChain][section].raw = raw; - flows[readableChain][section].perc = perc; - }); - }); - - return flows; -} - export async function fetchCurrentChainAssets() { const res = await getR2JSONString("chainAssets2"); From 2c7c96471be4692a8fa05bf924acf033840d18eb Mon Sep 17 00:00:00 2001 From: wayne Date: Tue, 31 Mar 2026 16:23:44 +0100 Subject: [PATCH 4/7] cached --- defi/l2/v2/storeToDb.ts | 148 +++++++++++++++++++++++++++------------- 1 file changed, 100 insertions(+), 48 deletions(-) diff --git a/defi/l2/v2/storeToDb.ts b/defi/l2/v2/storeToDb.ts index 569f885ec6..4533b9eed4 100644 --- a/defi/l2/v2/storeToDb.ts +++ b/defi/l2/v2/storeToDb.ts @@ -2,12 +2,16 @@ import postgres from "postgres"; import { queryPostgresWithRetry } from "../../src/utils/shared/bridgedTvlPostgres"; -import { getCurrentUnixTimestamp, getTimestampAtStartOfDay } from "../../src/utils/date"; import { getR2JSONString } from "../../src/utils/r2"; import { getChainDisplayName } from "../../src/utils/normalizeChain"; +import { + storeChainHistory, + readChainHistory, + storeAllChainsHistory, + readAllChainsHistory, +} from "./file-cache"; let auth: string[] = []; -const secondsInADay = 86400; const columns: any = ["timestamp", "value"]; async function iniDbConnection() { auth = process.env.COINS2_AUTH?.split(",") ?? []; @@ -30,29 +34,109 @@ export async function storeHistoricalToDB(res: { timestamp: number; value: any } `, sql ); + sql.end(); + + await precomputeHistoricalCache(); +} - // delete old hourly datas here?? +// precompute all-chains and per-chain totals into file cache after each DB store +async function precomputeHistoricalCache() { + const sql = await iniDbConnection(); + const rows = await queryPostgresWithRetry( + sql` + select distinct on (date_trunc('day', to_timestamp(timestamp))) + timestamp, value + from chainassets2 + order by date_trunc('day', to_timestamp(timestamp)) asc, timestamp desc + `, + sql + ); sql.end(); + + rows.sort((a: any, b: any) => a.timestamp - b.timestamp); + + const allChainsTotals: any[] = []; + const perChainTotals: { [chain: string]: any[] } = {}; + + rows.map((row: any) => { + const value = JSON.parse(row.value); + const allEntry: any = { timestamp: row.timestamp, data: {} }; + + Object.keys(value).map((c: string) => { + if (!value[c]) return; + const readableChain = getChainDisplayName(c, true); + allEntry.data[readableChain] = {}; + Object.keys(value[c]).map((section: string) => { + allEntry.data[readableChain][section] = value[c][section]?.total; + }); + + if (!perChainTotals[c]) perChainTotals[c] = []; + const chainEntry: any = { timestamp: row.timestamp, data: {} }; + Object.keys(value[c]).map((section: string) => { + chainEntry.data[section] = value[c][section]?.total; + }); + perChainTotals[c].push(chainEntry); + }); + + allChainsTotals.push(allEntry); + }); + + await Promise.all([ + storeAllChainsHistory(allChainsTotals), + ...Object.keys(perChainTotals).map((chain) => storeChainHistory(chain, perChainTotals[chain])), + ]); } export async function fetchHistoricalFromDB( chain: string | undefined = undefined, isRaw: boolean = false, breakdown: boolean = false +) { + // serve from file cache for totals (the common API case) + if (!isRaw && !breakdown) { + const cached = chain ? await readChainHistory(chain) : await readAllChainsHistory(); + if (cached && Array.isArray(cached) && cached.length > 0) return cached; + } + + // fallback to DB for cache misses, raw, or breakdown queries + return fetchHistoricalFromDBDirect(chain, isRaw, breakdown); +} + +async function fetchHistoricalFromDBDirect( + chain: string | undefined = undefined, + isRaw: boolean = false, + breakdown: boolean = false ) { const sql = await iniDbConnection(); - const timestamps = await queryPostgresWithRetry(sql`select timestamp from chainassets2`, sql); - const dailyEntries = findDailyTimestamps(timestamps); - const dailyData = await queryPostgresWithRetry( - sql`select * from chainassets2 where timestamp in ${sql(dailyEntries)}`, - sql - ); + const allData = chain + ? await queryPostgresWithRetry( + sql` + select timestamp, value::jsonb->${chain} as chain_value + from ( + select distinct on (date_trunc('day', to_timestamp(timestamp))) + timestamp, value + from chainassets2 + order by date_trunc('day', to_timestamp(timestamp)) asc, timestamp desc + ) daily + order by timestamp asc + `, + sql + ) + : await queryPostgresWithRetry( + sql` + select distinct on (date_trunc('day', to_timestamp(timestamp))) + timestamp, value + from chainassets2 + order by date_trunc('day', to_timestamp(timestamp)) asc, timestamp desc + `, + sql + ); sql.end(); const data = chain - ? dailyData.map((d: any) => ({ timestamp: d.timestamp, [chain]: JSON.parse(d.value)[chain] })) - : dailyData.map((d: any) => ({ timestamp: d.timestamp, ...JSON.parse(d.value) })); + ? allData.filter((d: any) => d.chain_value != null).map((d: any) => ({ timestamp: d.timestamp, [chain]: d.chain_value })) + : allData.map((d: any) => ({ timestamp: d.timestamp, ...JSON.parse(d.value) })); data.sort((a: any, b: any) => a.timestamp - b.timestamp); if (isRaw) return data; @@ -64,6 +148,7 @@ export async function fetchHistoricalFromDB( Object.keys(d).forEach((c: string) => { if (!c || c == "timestamp") return; if (chain) { + if (!d[chain]) return; Object.keys(d[chain]).map((section) => { totalsEntry.data[section] = d[chain][section].total; }); @@ -90,13 +175,11 @@ export async function fetchHistoricalFromDB( Object.keys(d).forEach((c: string) => { if (!c || c == "timestamp") return; if (chain) { + if (!d[chain]) return; Object.keys(d[chain]).map((section) => { symbolEntry.data[section] = { total: d[c][section].total, breakdown: {} }; Object.keys(d[c][section].breakdown ?? {}).forEach((asset: string) => { - if (!symbolMap[asset]) { - // console.log(`${asset} not found in symbolMap`); - return; - } + if (!symbolMap[asset]) return; symbolEntry.data[section].breakdown[symbolMap[asset]] = d[c][section].breakdown[asset]; }); }); @@ -106,10 +189,7 @@ export async function fetchHistoricalFromDB( Object.keys(d[c]).map((section) => { symbolEntry.data[readableChain][section] = { total: d[c][section].total, breakdown: {} }; Object.keys(d[c][section].breakdown ?? {}).forEach((asset: string) => { - if (!symbolMap[asset]) { - // console.log(`${asset} not found in symbolMap`); - return; - } + if (!symbolMap[asset]) return; symbolEntry.data[readableChain][section].breakdown[symbolMap[asset]] = d[c][section].breakdown[asset]; }); }); @@ -123,37 +203,9 @@ export async function fetchHistoricalFromDB( } export async function fetchChartData(chain: string) { - const allHistorical = await fetchHistoricalFromDB(chain); - const chartData: any[] = []; - allHistorical.map((h: any) => { - const entry: any = { timestamp: h.timestamp }; - Object.keys(h).map((chain: string) => { - if (chain == "timestamp") return; - const totalsOnly: { [key: string]: string } = {}; - Object.keys(h[chain]).map((section: string) => { - totalsOnly[section] = h[chain][section].total; - }); - entry[chain] = totalsOnly; - }); - chartData.push(entry); - }); - return chartData; + return fetchHistoricalFromDB(chain); } -export function findDailyTimestamps(timestamps: { timestamp: number }[]): number[] { - timestamps.sort((a, b) => a.timestamp - b.timestamp); - const end = getTimestampAtStartOfDay(getCurrentUnixTimestamp()); - const start = getTimestampAtStartOfDay(Number(timestamps[0].timestamp) + secondsInADay); - const dailyTimestamps = [timestamps[timestamps.length - 1].timestamp]; - - for (let i = start; i < end; i += secondsInADay) { - const timestamp = timestamps.reduce((prev, curr) => - Math.abs(curr.timestamp - i) < Math.abs(prev.timestamp - i) ? curr : prev - ); - dailyTimestamps.push(timestamp.timestamp); - } - return dailyTimestamps; -} export async function fetchCurrentChainAssets() { const res = await getR2JSONString("chainAssets2"); From 76c639d6b317d40b3e0e35cd32badc69915a559e Mon Sep 17 00:00:00 2001 From: wayne Date: Fri, 17 Apr 2026 11:19:38 +0100 Subject: [PATCH 5/7] file cache --- defi/l2/v2/file-cache.ts | 61 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) create mode 100644 defi/l2/v2/file-cache.ts diff --git a/defi/l2/v2/file-cache.ts b/defi/l2/v2/file-cache.ts new file mode 100644 index 0000000000..43b516c70a --- /dev/null +++ b/defi/l2/v2/file-cache.ts @@ -0,0 +1,61 @@ +import fs from "fs"; +import path from "path"; + +const CACHE_VERSION = "v1.0"; +const CACHE_DIR = process.env.CHAIN_ASSETS_CACHE_DIR || path.join(__dirname, ".chain-assets-cache"); +const VERSIONED_CACHE_DIR = path.join(CACHE_DIR, CACHE_VERSION); + +const pathExistsMap: { [key: string]: Promise } = {}; + +async function ensureDirExists(folder: string): Promise { + if (!pathExistsMap[folder]) { + pathExistsMap[folder] = (async () => { + try { + await fs.promises.access(folder); + } catch { + try { + await fs.promises.mkdir(folder, { recursive: true }); + } catch (e) { + console.error("Error creating directory:", (e as any)?.message); + } + } + })(); + } + return pathExistsMap[folder]; +} + +async function storeData(subPath: string, data: any): Promise { + const filePath = path.join(VERSIONED_CACHE_DIR, subPath); + await ensureDirExists(path.dirname(filePath)); + try { + await fs.promises.writeFile(filePath, JSON.stringify(data)); + } catch (e) { + console.error(`Error storing cache ${filePath}:`, (e as any)?.message); + } +} + +async function readData(subPath: string): Promise { + const filePath = path.join(VERSIONED_CACHE_DIR, subPath); + try { + const raw = await fs.promises.readFile(filePath, "utf8"); + return JSON.parse(raw); + } catch { + return null; + } +} + +export async function storeChainHistory(chain: string, data: any[]): Promise { + await storeData(`history/${chain}.json`, data); +} + +export async function readChainHistory(chain: string): Promise { + return readData(`history/${chain}.json`); +} + +export async function storeAllChainsHistory(data: any[]): Promise { + await storeData("history/all.json", data); +} + +export async function readAllChainsHistory(): Promise { + return readData("history/all.json"); +} From ed2446faec4c049f0e4ce36a2afe98b8328cf6bc Mon Sep 17 00:00:00 2001 From: wayne Date: Wed, 22 Apr 2026 16:56:55 +0100 Subject: [PATCH 6/7] fix(chain-assets V2): stellar / cosmos-bridge / aptos data-loss, plus verify & cache - stellar: uppercase asset_issuer in getStellarSupplies (Horizon is case-sensitive on issuer); dedupe by (code, issuer) so versioned ES variants don't double-count; add stellar to chainsWithCaseSensitiveDataProviders. Result: stellar data restored (was total=0); USDC $240M matches Horizon supply. - aptos: fall back to public fullnode when APTOS_RPC is unset so per-token supply fetches succeed; recovers ~\$1.69B of aptos TVL that was dropping to zero. - canonical / protocol bridges (noble, xion, echelon_initia, inertia): after fetchOutgoingAmountsFromDB, batch-resolve any bridge-emitted keys missing from allPrices via coins.getPrices. Adds a symbol-keyed fallback for adapters that emit plain symbols. Lowercase source-chain key in fetchOutgoingAmountsFromDB so sourceChainAmounts aligns with allChainKeys. - verify: new verifyChangesV2 that compares against the latest chainassets2 row (instead of the V1 /chain-assets/chains API), adds a "regressed to zero" check with a \$100k floor, and skips the drift check when prev is >6h stale. V1 verifyChanges kept unchanged for V1. - read-path: warm the file cache on DB-direct hits (25s cold -> <50ms warm), add a null-chain guard to fetchHistoricalFromDBDirect, and fix a d[c]/d[chain] alias in the single-chain breakdown branch. - yields/stablecoins endpoints: prefer pro-api.llama.fi/\${INTERNAL_API_KEY} when the key is set, with graceful degradation when upstream returns a non-array (e.g. 402 paywall). - storeChainAssetsV2 now takes an optional dryRun flag that skips all prod writes; used for local end-to-end verification. Co-Authored-By: Claude Opus 4.7 (1M context) --- defi/l2/utils.ts | 51 ++++++++++-- defi/l2/v2/index.ts | 128 ++++++++++++++++++++++++----- defi/l2/v2/storeToDb.ts | 26 ++++-- defi/l2/v2/verifyChanges.ts | 65 +++++++++++++++ defi/src/utils/shared/constants.ts | 2 +- 5 files changed, 237 insertions(+), 35 deletions(-) create mode 100644 defi/l2/v2/verifyChanges.ts diff --git a/defi/l2/utils.ts b/defi/l2/utils.ts index 018516647b..849b82ef11 100644 --- a/defi/l2/utils.ts +++ b/defi/l2/utils.ts @@ -114,7 +114,10 @@ async function getOsmosisSupplies(tokens: string[], timestamp?: number): Promise async function getAptosSupplies(tokens: string[], timestamp?: number): Promise<{ [token: string]: number }> { if (timestamp) throw new Error(`timestamp incompatible with Aptos adapter!`); const supplies: { [token: string]: number } = {}; - const rpc = process.env.APTOS_RPC; + // Public fullnode is rate-limited but keeps the pipeline working when + // APTOS_RPC isn't set (e.g. local dev). Prod should still set APTOS_RPC + // to a dedicated endpoint. + const rpc = process.env.APTOS_RPC || "https://fullnode.mainnet.aptoslabs.com"; await PromisePool.withConcurrency(1) .for(tokens) @@ -294,14 +297,44 @@ async function getStellarSupplies(tokens: string[], timestamp?: number): Promise if (timestamp) throw new Error(`timestamp incompatible with Stellar adapter!`); const supplies: { [token: string]: number } = {}; + // ES stores versioned variants of the same Stellar asset (e.g. + // "blnd-gdjehtbe...-1" in addition to "blnd-gdjehtbe..."). Both hit the + // same Horizon record, and both would otherwise be summed by symbol + // downstream — doubling the USDC / USDY / etc. figures for stellar. + // Dedupe by (code, issuer) pair, keeping the first-seen rawToken as the + // canonical key we store the supply under. + const seen = new Map(); + const dedupedTokens: string[] = []; + for (const rawToken of tokens) { + const upper = rawToken.toUpperCase(); + let canonicalKey: string; + if (isSorobanContractId(upper)) { + canonicalKey = upper; + } else { + const classicKey = stellarSacToClassic[upper] ?? upper; + // base (code, issuer); issuer is the trailing 56-char G-prefixed string; + // anything after that (e.g. "-1", "-2") is a versioning artifact. + const m = classicKey.match(/^(.+-[A-Z2-7]{55})(?:-\d+)?$/); + canonicalKey = m ? m[1] : classicKey; + } + if (seen.has(canonicalKey)) continue; + seen.set(canonicalKey, rawToken); + dedupedTokens.push(rawToken); + } + await PromisePool.withConcurrency(3) - .for(tokens) - .process(async (token) => { + .for(dedupedTokens) + .process(async (rawToken) => { try { + // Upstream (ES whitelist) stores stellar keys lowercased. Stellar asset + // issuers and Soroban contract IDs are G/C-prefixed base32 strings that + // are case-sensitive on Horizon — re-uppercase before querying. + const token = rawToken.toUpperCase(); + // Native Soroban contracts: call total_supply() via RPC if (isSorobanContractId(token)) { const supply = await getSorobanTokenSupply(token); - if (supply != null && supply > BigInt(0)) supplies[`stellar:${token}`] = Number(supply); + if (supply != null && supply > BigInt(0)) supplies[`stellar:${rawToken}`] = Number(supply); return; } @@ -309,11 +342,13 @@ async function getStellarSupplies(tokens: string[], timestamp?: number): Promise const classicKey = stellarSacToClassic[token] ?? token; if (classicKey === "XLM") return; // native asset handled by ownTokens - // Token format: "{asset_code}-{asset_issuer}" (dash-separated) + // Token format: "{asset_code}-{asset_issuer}" (dash-separated). + // Horizon is case-insensitive for asset_code but case-sensitive for + // asset_issuer. const dashIdx = classicKey.lastIndexOf("-"); if (dashIdx === -1) return; const asset_code = classicKey.substring(0, dashIdx); - const asset_issuer = classicKey.substring(dashIdx + 1); + const asset_issuer = classicKey.substring(dashIdx + 1).toUpperCase(); const res = await fetch( `https://horizon.stellar.org/assets?asset_code=${asset_code}&asset_issuer=${asset_issuer}&limit=1` ).then((r) => r.json()); @@ -321,7 +356,9 @@ async function getStellarSupplies(tokens: string[], timestamp?: number): Promise if (record?.balances?.authorized != null) { // Horizon exposes amount in display units with 7 implicit decimal places. // Multiply by 1e7 to align with decimals=7 returned by the price API. - supplies[`stellar:${token}`] = Math.round(parseFloat(record.balances.authorized) * 1e7); + // Key the supply under the original (lowercase) token so it matches + // the keys coins.getPrices returned. + supplies[`stellar:${rawToken}`] = Math.round(parseFloat(record.balances.authorized) * 1e7); } } catch (e) {} }); diff --git a/defi/l2/v2/index.ts b/defi/l2/v2/index.ts index e845f04918..ab9cabf70f 100644 --- a/defi/l2/v2/index.ts +++ b/defi/l2/v2/index.ts @@ -21,7 +21,7 @@ import { additional, excluded } from "../adapters/manual"; import { storeHistoricalToDB } from "./storeToDb"; import { stablecoins } from "../../src/getProtocols"; import { metadata as rwaMetadata } from "../../src/rwa/protocols"; -import { verifyChanges } from "../verifyChanges"; +import { verifyChangesV2 } from "./verifyChanges"; import { initializePriceQueryFilter, whitelistedTokenSetRawPids } from "../../src/storeTvlInterval/computeTVL"; import { getClosestProtocolItem, getLatestProtocolItems, initializeTVLCacheDB } from "../../src/api2/db"; import { hourlyRawTokensTvl } from "../../src/utils/getLastRecord"; @@ -217,21 +217,26 @@ async function fetchOutgoingAmountsFromDB(timestamp: number): Promise<{ const sourceChainAmounts: { [chain: Chain]: { [token: string]: BigNumber } } = {}; const protocolAmounts: { [chain: Chain]: { [token: string]: BigNumber } } = {}; const destinationChainAmounts: { [chain: Chain]: { [token: string]: BigNumber } } = {}; + tvls.map(({ data, id }: any) => { if (!ids.includes(id)) return; - Object.keys(data).map((chain: string) => { - if (excludedTvlKeys.includes(chain)) return; + Object.keys(data).map((rawChain: string) => { + if (excludedTvlKeys.includes(rawChain)) return; + // Some canonical-bridge adapters (xion, noble, echelon_initia, inertia, ...) + // report their source chain in display case ("XION", "Noble"), which would + // never match our lowercase chain keys when we later look up sourceChainAmounts[chain]. + const chain = rawChain.toLowerCase(); if (!sourceChainAmounts[chain]) sourceChainAmounts[chain] = {}; - Object.keys(data[chain]).map((token: string) => { + Object.keys(data[rawChain]).map((token: string) => { const key = normalizeKey(token); if (!sourceChainAmounts[chain][key]) sourceChainAmounts[chain][key] = zero; - sourceChainAmounts[chain][key] = sourceChainAmounts[chain][key].plus(data[chain][token]); + sourceChainAmounts[chain][key] = sourceChainAmounts[chain][key].plus(data[rawChain][token]); if (Object.keys(protocolBridgeIds).includes(id)) { const protocolSlug = protocolBridgeIds[id]; if (!protocolAmounts[protocolSlug]) protocolAmounts[protocolSlug] = {}; if (!protocolAmounts[protocolSlug][key]) protocolAmounts[protocolSlug][key] = zero; - protocolAmounts[protocolSlug][key] = protocolAmounts[protocolSlug][key].plus(data[chain][token]); + protocolAmounts[protocolSlug][key] = protocolAmounts[protocolSlug][key].plus(data[rawChain][token]); return; } @@ -240,7 +245,7 @@ async function fetchOutgoingAmountsFromDB(timestamp: number): Promise<{ if (!destinationChainAmounts[destinationChain]) destinationChainAmounts[destinationChain] = {}; if (!destinationChainAmounts[destinationChain][key]) destinationChainAmounts[destinationChain][key] = zero; destinationChainAmounts[destinationChain][key] = destinationChainAmounts[destinationChain][key].plus( - data[chain][token] + data[rawChain][token] ); }); }); @@ -292,22 +297,45 @@ async function fetchExcludedAmounts(timestamp: number) { return excludedAmounts; } +// The open endpoints (yields.llama.fi/lsdRates, stablecoins.llama.fi/stablecoins) +// are paywalled for some hosts. Use the authenticated pro-api path when +// INTERNAL_API_KEY is set — same base used throughout the rest of the codebase +// (see src/rwa/index.ts, src/updateSearch.ts, etc.). +function proApi(path: string, publicFallback: string): string { + const key = process.env.INTERNAL_API_KEY; + return key ? `https://pro-api.llama.fi/${key}/${path.replace(/^\//, "")}` : publicFallback; +} + // fetch stablecoin symbols async function fetchStablecoinSymbols() { - const { peggedAssets } = await cachedFetch({ - key: "stablecoin-symbols", - endpoint: "https://stablecoins.llama.fi/stablecoins", - }); - const symbols = peggedAssets.map((s: any) => s.symbol); + let symbols: string[] = []; + try { + const res: any = await cachedFetch({ + key: "stablecoin-symbols", + endpoint: proApi("stablecoins/stablecoins", "https://stablecoins.llama.fi/stablecoins"), + }); + if (Array.isArray(res?.peggedAssets)) symbols = res.peggedAssets.map((s: any) => s.symbol); + else console.warn("fetchStablecoinSymbols: unexpected response shape, falling back to hardcoded list"); + } catch (e: any) { + console.warn("fetchStablecoinSymbols failed, falling back to hardcoded list:", e?.message); + } const allSymbols = [...new Set([...symbols, ...stablecoins].map((t) => t.toUpperCase()))]; return allSymbols; } // fetch lst symbols async function fetchLstSymbols() { - const assets = await cachedFetch({ key: "lst-symbols", endpoint: "https://yields.llama.fi/lsdRates" }); - const symbols = assets.map((s: any) => s.symbol.toUpperCase()); - return symbols; + try { + const assets: any = await cachedFetch({ + key: "lst-symbols", + endpoint: proApi("yields/lsdRates", "https://yields.llama.fi/lsdRates"), + }); + if (Array.isArray(assets)) return assets.map((s: any) => s.symbol?.toUpperCase()).filter(Boolean); + console.warn("fetchLstSymbols: non-array response, skipping LST classification"); + } catch (e: any) { + console.warn("fetchLstSymbols failed, skipping LST classification:", e?.message); + } + return [] as string[]; } // fetch rwa symbols @@ -365,7 +393,7 @@ const newChainAssets = () => ({ }); // main function -export async function storeChainAssetsV2(override: boolean = false) { +export async function storeChainAssetsV2(override: boolean = false, dryRun: boolean = false) { const timestamp = 0; await fetchAllTokens(); const { sourceChainAmounts, protocolAmounts, destinationChainAmounts } = await fetchOutgoingAmountsFromDB(timestamp); @@ -376,6 +404,43 @@ export async function storeChainAssetsV2(override: boolean = false) { const lstSymbols = await fetchLstSymbols(); const rwaSymbols = fetchRwaSymbols(); + // Bridge / protocol-bridge adapters (noble, xion, echelon_initia, inertia, ...) + // emit tokens keyed by CoinGecko slug ("usd-coin", "xion-2", "celestia", "initia") + // or sometimes by bare symbol ("osmo", "axl"). `fetchNativeAndMcaps` only prices + // tokens it saw on per-chain native lists, so keys like `coingecko:usd-coin` + // are absent from allPrices. Collect everything the bridge-fetch step produced, + // pick out whatever allPrices is missing, and resolve it in one batch. + const allBridgeKeys = new Set(); + Object.values(destinationChainAmounts).forEach((m) => Object.keys(m).forEach((k) => allBridgeKeys.add(k))); + Object.values(protocolAmounts).forEach((m) => Object.keys(m).forEach((k) => allBridgeKeys.add(k))); + const missingBridgeKeys = [...allBridgeKeys].filter((k) => !allPrices[k]); + if (missingBridgeKeys.length) { + console.log(`[bridge-price-fill] resolving ${missingBridgeKeys.length} missing bridge/protocol keys via coins API`); + try { + const extra = await coins.getPrices(missingBridgeKeys, timestamp); + const got = Object.keys(extra).length; + console.log(`[bridge-price-fill] resolved ${got}/${missingBridgeKeys.length} (sample:`, Object.keys(extra).slice(0, 5), ")"); + Object.keys(extra).forEach((k) => { + if (k.startsWith("coingecko:")) (extra[k] as any).decimals = 0; // match native-side convention + allPrices[k] = extra[k]; + }); + } catch (e: any) { + console.warn("[bridge-price-fill] failed:", e?.message); + } + } + + // Some canonical-bridge adapters emit tokens keyed by uppercase symbol + // ("AXLUSDC", "ATOM"). normalizeKey turns those into "coingecko:axlusdc", which + // almost never matches a real coingecko id. Build a reverse symbol → pricing + // index from allPrices so those can still be resolved in the destination loop. + const symbolToPrice: { [symbol: string]: CoinsApiData } = {}; + Object.values(allPrices).forEach((p) => { + const sym = p?.symbol?.toUpperCase(); + if (!sym) return; + // Prefer the first resolution; symbol collisions are ambiguous by construction. + if (!symbolToPrice[sym]) symbolToPrice[sym] = p; + }); + // adjust native asset balances by excluded and outgoing amounts const nativeDataAfterDeductions: { [chain: Chain]: { [token: string]: BigNumber } } = {}; allChainKeys.map((chain: Chain) => { @@ -405,9 +470,23 @@ export async function storeChainAssetsV2(override: boolean = false) { const destinationChainAmount = destinationChainAmounts[chain]; Object.keys(destinationChainAmount).map((token: string) => { const key = normalizeKey(token); - const coinData = allPrices[key]; + let coinData = allPrices[key]; + // When the adapter was symbol-keyed, `key` looks like "coingecko:axlusdc" but + // there is no such coingecko id. Fall back to a symbol reverse lookup. + // Symbol-keyed adapter output is also typically in whole tokens, not base + // units, so we must skip the `/ 10^decimals` step for these. + let isWholeTokenAmount = false; + if ((!coinData || !coinData.price) && key.startsWith("coingecko:")) { + const symbol = key.slice("coingecko:".length).toUpperCase(); + const resolved = symbolToPrice[symbol]; + if (resolved?.price) { + coinData = resolved; + isWholeTokenAmount = true; + } + } if (!coinData || !coinData.price) return; - const usdAmount = destinationChainAmount[token].times(coinData.price).div(BigNumber(10).pow(coinData.decimals)); + const divisor = isWholeTokenAmount ? BigNumber(1) : BigNumber(10).pow(coinData.decimals); + const usdAmount = destinationChainAmount[token].times(coinData.price).div(divisor); nativeDataAfterDeductions[chain][key] = usdAmount; }); } @@ -492,8 +571,10 @@ export async function storeChainAssetsV2(override: boolean = false) { }); }); - // create symbol key data - const symbolMapPromise = storeR2JSONString("chainAssetsSymbolMap", JSON.stringify(symbolMap)); + // create symbol key data — only actually issue the R2 write when not in dry-run mode + const symbolMapPromise = dryRun + ? Promise.resolve() + : storeR2JSONString("chainAssetsSymbolMap", JSON.stringify(symbolMap)); [rawData, symbolData].map((allData) => { Object.keys(allData).map((chain: Chain) => { let totalTotal = zero; @@ -531,7 +612,12 @@ export async function storeChainAssetsV2(override: boolean = false) { }); }); - if (!override) await verifyChanges(symbolData); + if (!override) await verifyChangesV2(symbolData); + + if (dryRun) { + console.log("[dryRun] skipping prod writes (chainAssetsSymbolMap R2, chainassets2 DB, chainAssets2 R2)"); + return { rawData, symbolData }; + } await Promise.all([ symbolMapPromise, diff --git a/defi/l2/v2/storeToDb.ts b/defi/l2/v2/storeToDb.ts index 4533b9eed4..c2c6fef9dc 100644 --- a/defi/l2/v2/storeToDb.ts +++ b/defi/l2/v2/storeToDb.ts @@ -98,8 +98,20 @@ export async function fetchHistoricalFromDB( if (cached && Array.isArray(cached) && cached.length > 0) return cached; } - // fallback to DB for cache misses, raw, or breakdown queries - return fetchHistoricalFromDBDirect(chain, isRaw, breakdown); + const result = await fetchHistoricalFromDBDirect(chain, isRaw, breakdown); + + // Warm the cache on read-path misses so subsequent requests don't re-hit the DB. + // Totals-only: raw/breakdown responses bypass the cache and aren't stored. + if (!isRaw && !breakdown && Array.isArray(result) && result.length > 0) { + try { + if (chain) await storeChainHistory(chain, result); + else await storeAllChainsHistory(result); + } catch (e: any) { + console.error("cache warm failed:", e?.message); + } + } + + return result; } async function fetchHistoricalFromDBDirect( @@ -153,6 +165,7 @@ async function fetchHistoricalFromDBDirect( totalsEntry.data[section] = d[chain][section].total; }); } else { + if (!d[c]) return; const readableChain = getChainDisplayName(c, true); totalsEntry.data[readableChain] = {}; Object.keys(d[c]).map((section) => { @@ -167,7 +180,7 @@ async function fetchHistoricalFromDBDirect( return totalsData; } - const symbolMap: { [key: string]: string } = await getR2JSONString("chainAssetsSymbolMap"); + const symbolMap: { [key: string]: string } = (await getR2JSONString("chainAssetsSymbolMap")) ?? {}; const symbolData: any[] = []; data.map((d: any) => { @@ -177,13 +190,14 @@ async function fetchHistoricalFromDBDirect( if (chain) { if (!d[chain]) return; Object.keys(d[chain]).map((section) => { - symbolEntry.data[section] = { total: d[c][section].total, breakdown: {} }; - Object.keys(d[c][section].breakdown ?? {}).forEach((asset: string) => { + symbolEntry.data[section] = { total: d[chain][section].total, breakdown: {} }; + Object.keys(d[chain][section].breakdown ?? {}).forEach((asset: string) => { if (!symbolMap[asset]) return; - symbolEntry.data[section].breakdown[symbolMap[asset]] = d[c][section].breakdown[asset]; + symbolEntry.data[section].breakdown[symbolMap[asset]] = d[chain][section].breakdown[asset]; }); }); } else { + if (!d[c]) return; const readableChain = getChainDisplayName(c, true); symbolEntry.data[readableChain] = {}; Object.keys(d[c]).map((section) => { diff --git a/defi/l2/v2/verifyChanges.ts b/defi/l2/v2/verifyChanges.ts new file mode 100644 index 0000000000..c1bbf86c38 --- /dev/null +++ b/defi/l2/v2/verifyChanges.ts @@ -0,0 +1,65 @@ +import postgres from "postgres"; +import { queryPostgresWithRetry } from "../../src/utils/shared/bridgedTvlPostgres"; +import { getCurrentUnixTimestamp } from "../../src/utils/date"; +import { FinalData } from "../types"; + +// Threshold below which a drop to zero is treated as dust, not a regression. +const ZERO_REGRESSION_FLOOR_USD = 100_000; + +function sqlConn() { + const auth = process.env.COINS2_AUTH?.split(",") ?? []; + if (auth.length !== 3) throw new Error("COINS2_AUTH must have 3 values"); + return postgres(auth[0], { idle_timeout: 30 }); +} + +export async function verifyChangesV2(chains: FinalData): Promise { + const sql = sqlConn(); + let rows: any[]; + try { + rows = await queryPostgresWithRetry( + sql`select timestamp, value from chainassets2 order by timestamp desc limit 1`, + sql, + ); + } finally { + sql.end(); + } + + if (!rows.length) return; // first ever run — nothing to compare against + const prev = JSON.parse(rows[0].value); + const prevTs = Number(rows[0].timestamp); + const hours = (getCurrentUnixTimestamp() - prevTs) / 3600; + + const issues: string[] = []; + + for (const chain of Object.keys(chains)) { + const allNew = chains[chain]; + const allOld = prev[chain]; + const totalNew = Number(allNew?.total?.total ?? 0); + const totalOld = Number(allOld?.total?.total ?? 0); + + if (chain.toLowerCase() === "bsc") console.log(`BSC own tokens: ${allNew.ownTokens.total}`); + if (chain.toLowerCase() === "solana" && totalNew < 1000) throw new Error(`Missing Solana TVL`); + if (chain.toLowerCase() === "tron" && totalNew < 15_000_000_000) throw new Error(`USDT not counted for Tron`); + + // Regression from real TVL to zero, regardless of staleness. + if (totalOld > ZERO_REGRESSION_FLOOR_USD && totalNew === 0) { + issues.push(`${chain}: total collapsed from ${totalOld.toFixed(0)} to 0`); + continue; + } + + if (!allOld || totalOld === 0) continue; + + // Drift check only meaningful when previous snapshot is recent. + if (hours >= 6) continue; + + const forwardChange = (100 * Math.abs(totalNew - totalOld)) / totalOld; + const backwardChange = totalNew !== 0 ? (100 * Math.abs(totalNew - totalOld)) / totalNew : 0; + if (forwardChange < 100 && backwardChange < 100) continue; + + issues.push( + `${chain} has had a ${totalNew > totalOld ? "increase" : "decrease"} of ${forwardChange.toFixed(0)}% in ${hours.toFixed(1)}h`, + ); + } + + if (issues.length) throw new Error(`verifyChangesV2:\n ${issues.join("\n ")}`); +} diff --git a/defi/src/utils/shared/constants.ts b/defi/src/utils/shared/constants.ts index cb6d849203..560fa3e1ff 100644 --- a/defi/src/utils/shared/constants.ts +++ b/defi/src/utils/shared/constants.ts @@ -6,6 +6,6 @@ export const chainsThatShouldNotBeLowerCased = ["solana", "bitcoin", "eclipse"]; export const bridgedTvlMixedCaseChains: string[] = ["solana", "tron", "sui", "aptos", "eclipse", "stellar"]; -export const chainsWithCaseSensitiveDataProviders = ['aptos', 'stacks', 'sui', 'tron'] +export const chainsWithCaseSensitiveDataProviders = ['aptos', 'stacks', 'sui', 'tron', 'stellar'] export const nullAddress = "0x0000000000000000000000000000000000000000"; From 7a140e4b5e3fb6221e69bb7ff0e71cf56465eabf Mon Sep 17 00:00:00 2001 From: wayne Date: Thu, 7 May 2026 09:15:15 +0100 Subject: [PATCH 7/7] patches TO TEST --- defi/l2/utils.ts | 6 +- defi/l2/v2/file-cache.ts | 19 ++++++- defi/l2/v2/index.ts | 52 ++++++++++++------ defi/l2/v2/storeToDb.ts | 115 +++++++++++++++++++++++---------------- 4 files changed, 125 insertions(+), 67 deletions(-) diff --git a/defi/l2/utils.ts b/defi/l2/utils.ts index 849b82ef11..3441c7a338 100644 --- a/defi/l2/utils.ts +++ b/defi/l2/utils.ts @@ -314,7 +314,7 @@ async function getStellarSupplies(tokens: string[], timestamp?: number): Promise const classicKey = stellarSacToClassic[upper] ?? upper; // base (code, issuer); issuer is the trailing 56-char G-prefixed string; // anything after that (e.g. "-1", "-2") is a versioning artifact. - const m = classicKey.match(/^(.+-[A-Z2-7]{55})(?:-\d+)?$/); + const m = classicKey.match(/^(.+-[A-Z2-7]{56})(?:-\d+)?$/); canonicalKey = m ? m[1] : classicKey; } if (seen.has(canonicalKey)) continue; @@ -334,7 +334,9 @@ async function getStellarSupplies(tokens: string[], timestamp?: number): Promise // Native Soroban contracts: call total_supply() via RPC if (isSorobanContractId(token)) { const supply = await getSorobanTokenSupply(token); - if (supply != null && supply > BigInt(0)) supplies[`stellar:${rawToken}`] = Number(supply); + // getSorobanTokenSupply returns number | null; mixing BigInt(0) into + // the comparison would throw at runtime, so compare in the number domain. + if (supply != null && supply > 0) supplies[`stellar:${rawToken}`] = supply; return; } diff --git a/defi/l2/v2/file-cache.ts b/defi/l2/v2/file-cache.ts index 43b516c70a..7d5f0e3996 100644 --- a/defi/l2/v2/file-cache.ts +++ b/defi/l2/v2/file-cache.ts @@ -27,10 +27,15 @@ async function ensureDirExists(folder: string): Promise { async function storeData(subPath: string, data: any): Promise { const filePath = path.join(VERSIONED_CACHE_DIR, subPath); await ensureDirExists(path.dirname(filePath)); + // Write to a temp file in the same directory and rename, so a crash mid-write + // can't leave a truncated/empty cache file in place. + const tmpPath = `${filePath}.${process.pid}.${Date.now()}.tmp`; try { - await fs.promises.writeFile(filePath, JSON.stringify(data)); + await fs.promises.writeFile(tmpPath, JSON.stringify(data)); + await fs.promises.rename(tmpPath, filePath); } catch (e) { console.error(`Error storing cache ${filePath}:`, (e as any)?.message); + fs.promises.unlink(tmpPath).catch(() => {}); } } @@ -44,12 +49,20 @@ async function readData(subPath: string): Promise { } } +// Strip anything that could escape the cache directory (slashes, dots, control +// chars) before the chain name is interpolated into a file path. Dashes and +// underscores stay so existing chain keys like "polygon-zkevm" round-trip. +function normalizeChain(chain: string): string { + const cleaned = (chain ?? "").replace(/[^a-zA-Z0-9_-]/g, "").toLowerCase(); + return cleaned || "unknown"; +} + export async function storeChainHistory(chain: string, data: any[]): Promise { - await storeData(`history/${chain}.json`, data); + await storeData(`history/${normalizeChain(chain)}.json`, data); } export async function readChainHistory(chain: string): Promise { - return readData(`history/${chain}.json`); + return readData(`history/${normalizeChain(chain)}.json`); } export async function storeAllChainsHistory(data: any[]): Promise { diff --git a/defi/l2/v2/index.ts b/defi/l2/v2/index.ts index c472cc4eb2..6e2c583162 100644 --- a/defi/l2/v2/index.ts +++ b/defi/l2/v2/index.ts @@ -21,7 +21,7 @@ import { additional, excluded } from "../adapters/manual"; import { storeHistoricalToDB } from "./storeToDb"; import { stablecoins } from "../../src/getProtocols"; import { metadata as rwaMetadata } from "../../src/rwa/protocols"; -import { verifyChanges } from "../verifyChanges"; +import { verifyChangesV2 } from "./verifyChanges"; import { fetchTokensList } from "../../src/utils/coinsApi"; import { getClosestProtocolItem, getLatestProtocolItems, initializeTVLCacheDB } from "../../src/api2/db"; import { hourlyRawTokensTvl } from "../../src/utils/getLastRecord"; @@ -384,7 +384,7 @@ const newChainAssets = () => ({ }); // main function -export async function storeChainAssetsV2(override: boolean = false) { +export async function storeChainAssetsV2(override: boolean = false, dryRun: boolean = false) { if (!process.env.COINS_V4_API_URL) { throw new Error("storeChainAssetsV2 requires COINS_V4_API_URL — run with coins v4 API configured"); } @@ -411,12 +411,25 @@ export async function storeChainAssetsV2(override: boolean = false) { console.log(`[bridge-price-fill] resolving ${missingBridgeKeys.length} missing bridge/protocol keys via coins API`); try { const extra = await coins.getPrices(missingBridgeKeys, timestamp); - const got = Object.keys(extra).length; - console.log(`[bridge-price-fill] resolved ${got}/${missingBridgeKeys.length} (sample:`, Object.keys(extra).slice(0, 5), ")"); - Object.keys(extra).forEach((k) => { + const resolvedKeys = Object.keys(extra); + console.log(`[bridge-price-fill] resolved ${resolvedKeys.length}/${missingBridgeKeys.length} (sample:`, resolvedKeys.slice(0, 5), ")"); + resolvedKeys.forEach((k) => { if (k.startsWith("coingecko:")) (extra[k] as any).decimals = 0; // match native-side convention allPrices[k] = extra[k]; }); + // nativeDataAfterMcaps drops keys that have no mcap entry, so the bridge + // fill must include mcaps for the same keys or every newly-priced bridge + // amount silently disappears in the next stage. + if (resolvedKeys.length) { + try { + const extraMcaps = await coins.getMcaps(resolvedKeys, timestamp); + Object.keys(extraMcaps).forEach((k) => { + allMcaps[k] = extraMcaps[k]; + }); + } catch (e: any) { + console.warn("[bridge-price-fill] mcap fetch failed:", e?.message); + } + } } catch (e: any) { console.warn("[bridge-price-fill] failed:", e?.message); } @@ -426,12 +439,16 @@ export async function storeChainAssetsV2(override: boolean = false) { // ("AXLUSDC", "ATOM"). normalizeKey turns those into "coingecko:axlusdc", which // almost never matches a real coingecko id. Build a reverse symbol → pricing // index from allPrices so those can still be resolved in the destination loop. - const symbolToPrice: { [symbol: string]: CoinsApiData } = {}; - Object.values(allPrices).forEach((p) => { + // We also keep the resolved key alongside the data so the destination loop + // can re-key the amount under a key that actually exists in allMcaps; storing + // under the unresolved "coingecko:axlusdc" would cause it to be dropped by + // the mcap stage downstream. + const symbolToPrice: { [symbol: string]: { key: string; data: CoinsApiData } } = {}; + Object.entries(allPrices).forEach(([priceKey, p]) => { const sym = p?.symbol?.toUpperCase(); if (!sym) return; // Prefer the first resolution; symbol collisions are ambiguous by construction. - if (!symbolToPrice[sym]) symbolToPrice[sym] = p; + if (!symbolToPrice[sym]) symbolToPrice[sym] = { key: priceKey, data: p }; }); // adjust native asset balances by excluded and outgoing amounts @@ -464,6 +481,7 @@ export async function storeChainAssetsV2(override: boolean = false) { Object.keys(destinationChainAmount).map((token: string) => { const key = normalizeKey(token); let coinData = allPrices[key]; + let outputKey = key; // When the adapter was symbol-keyed, `key` looks like "coingecko:axlusdc" but // there is no such coingecko id. Fall back to a symbol reverse lookup. // Symbol-keyed adapter output is also typically in whole tokens, not base @@ -472,15 +490,18 @@ export async function storeChainAssetsV2(override: boolean = false) { if ((!coinData || !coinData.price) && key.startsWith("coingecko:")) { const symbol = key.slice("coingecko:".length).toUpperCase(); const resolved = symbolToPrice[symbol]; - if (resolved?.price) { - coinData = resolved; + if (resolved?.data?.price) { + coinData = resolved.data; + // Re-key the amount under the resolved key so allMcaps lookups + // downstream actually find an entry. + outputKey = resolved.key; isWholeTokenAmount = true; } } if (!coinData || !coinData.price) return; const divisor = isWholeTokenAmount ? BigNumber(1) : BigNumber(10).pow(coinData.decimals); const usdAmount = destinationChainAmount[token].times(coinData.price).div(divisor); - nativeDataAfterDeductions[chain][key] = usdAmount; + nativeDataAfterDeductions[chain][outputKey] = usdAmount; }); } }); @@ -564,10 +585,9 @@ export async function storeChainAssetsV2(override: boolean = false) { }); }); - // create symbol key data — only actually issue the R2 write when not in dry-run mode - const symbolMapPromise = dryRun - ? Promise.resolve() - : storeR2JSONString("chainAssetsSymbolMap", JSON.stringify(symbolMap)); + // The symbol-map R2 write is deferred to the post-validation Promise.all + // below so we never persist an updated map for a snapshot that + // verifyChangesV2 ends up rejecting. [rawData, symbolData].map((allData) => { Object.keys(allData).map((chain: Chain) => { let totalTotal = zero; @@ -613,7 +633,7 @@ export async function storeChainAssetsV2(override: boolean = false) { } await Promise.all([ - symbolMapPromise, + storeR2JSONString("chainAssetsSymbolMap", JSON.stringify(symbolMap)), storeHistoricalToDB({ timestamp: getCurrentUnixTimestamp(), value: rawData }), storeR2JSONString("chainAssets2", JSON.stringify({ timestamp: getCurrentUnixTimestamp(), value: symbolData })), ]); diff --git a/defi/l2/v2/storeToDb.ts b/defi/l2/v2/storeToDb.ts index c2c6fef9dc..557c61d23f 100644 --- a/defi/l2/v2/storeToDb.ts +++ b/defi/l2/v2/storeToDb.ts @@ -27,31 +27,44 @@ export async function storeHistoricalToDB(res: { timestamp: number; value: any } }; const sql = await iniDbConnection(); - await queryPostgresWithRetry( - sql` - insert into chainassets2 - ${sql([write], ...columns)} - `, - sql - ); - sql.end(); - - await precomputeHistoricalCache(); + try { + await queryPostgresWithRetry( + sql` + insert into chainassets2 + ${sql([write], ...columns)} + `, + sql + ); + } finally { + sql.end(); + } + + // Cache rebuild is best-effort: the row is already persisted, so a precompute + // failure must not surface as a publish failure. + try { + await precomputeHistoricalCache(); + } catch (e: any) { + console.error("precomputeHistoricalCache failed (DB row already persisted):", e?.message); + } } // precompute all-chains and per-chain totals into file cache after each DB store async function precomputeHistoricalCache() { const sql = await iniDbConnection(); - const rows = await queryPostgresWithRetry( - sql` - select distinct on (date_trunc('day', to_timestamp(timestamp))) - timestamp, value - from chainassets2 - order by date_trunc('day', to_timestamp(timestamp)) asc, timestamp desc - `, - sql - ); - sql.end(); + let rows: any[]; + try { + rows = await queryPostgresWithRetry( + sql` + select distinct on (date_trunc('day', to_timestamp(timestamp) AT TIME ZONE 'UTC')) + timestamp, value + from chainassets2 + order by date_trunc('day', to_timestamp(timestamp) AT TIME ZONE 'UTC') asc, timestamp desc + `, + sql + ); + } finally { + sql.end(); + } rows.sort((a: any, b: any) => a.timestamp - b.timestamp); @@ -121,30 +134,34 @@ async function fetchHistoricalFromDBDirect( ) { const sql = await iniDbConnection(); - const allData = chain - ? await queryPostgresWithRetry( - sql` - select timestamp, value::jsonb->${chain} as chain_value - from ( - select distinct on (date_trunc('day', to_timestamp(timestamp))) + let allData: any[]; + try { + allData = chain + ? await queryPostgresWithRetry( + sql` + select timestamp, value::jsonb->${chain} as chain_value + from ( + select distinct on (date_trunc('day', to_timestamp(timestamp) AT TIME ZONE 'UTC')) + timestamp, value + from chainassets2 + order by date_trunc('day', to_timestamp(timestamp) AT TIME ZONE 'UTC') asc, timestamp desc + ) daily + order by timestamp asc + `, + sql + ) + : await queryPostgresWithRetry( + sql` + select distinct on (date_trunc('day', to_timestamp(timestamp) AT TIME ZONE 'UTC')) timestamp, value from chainassets2 - order by date_trunc('day', to_timestamp(timestamp)) asc, timestamp desc - ) daily - order by timestamp asc - `, - sql - ) - : await queryPostgresWithRetry( - sql` - select distinct on (date_trunc('day', to_timestamp(timestamp))) - timestamp, value - from chainassets2 - order by date_trunc('day', to_timestamp(timestamp)) asc, timestamp desc - `, - sql - ); - sql.end(); + order by date_trunc('day', to_timestamp(timestamp) AT TIME ZONE 'UTC') asc, timestamp desc + `, + sql + ); + } finally { + sql.end(); + } const data = chain ? allData.filter((d: any) => d.chain_value != null).map((d: any) => ({ timestamp: d.timestamp, [chain]: d.chain_value })) @@ -192,8 +209,12 @@ async function fetchHistoricalFromDBDirect( Object.keys(d[chain]).map((section) => { symbolEntry.data[section] = { total: d[chain][section].total, breakdown: {} }; Object.keys(d[chain][section].breakdown ?? {}).forEach((asset: string) => { - if (!symbolMap[asset]) return; - symbolEntry.data[section].breakdown[symbolMap[asset]] = d[chain][section].breakdown[asset]; + const symbol = symbolMap[asset]; + if (!symbol) return; + // Multiple asset keys can resolve to the same symbol; aggregate to + // avoid silently overwriting earlier entries. + const prev = Number(symbolEntry.data[section].breakdown[symbol] ?? 0); + symbolEntry.data[section].breakdown[symbol] = prev + Number(d[chain][section].breakdown[asset] ?? 0); }); }); } else { @@ -203,8 +224,10 @@ async function fetchHistoricalFromDBDirect( Object.keys(d[c]).map((section) => { symbolEntry.data[readableChain][section] = { total: d[c][section].total, breakdown: {} }; Object.keys(d[c][section].breakdown ?? {}).forEach((asset: string) => { - if (!symbolMap[asset]) return; - symbolEntry.data[readableChain][section].breakdown[symbolMap[asset]] = d[c][section].breakdown[asset]; + const symbol = symbolMap[asset]; + if (!symbol) return; + const prev = Number(symbolEntry.data[readableChain][section].breakdown[symbol] ?? 0); + symbolEntry.data[readableChain][section].breakdown[symbol] = prev + Number(d[c][section].breakdown[asset] ?? 0); }); }); }