-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Refactor Chain assets #12222
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Refactor Chain assets #12222
Changes from 5 commits
340eda0
dcc692c
777724e
7e6d375
2e7e05b
16873e9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,131 @@ | ||
| import dynamodb from "../src/utils/shared/dynamodb"; | ||
| import { getRecordClosestToTimestamp } from "../src/utils/shared/getRecordClosestToTimestamp"; | ||
| import { initializeTVLCacheDB, TABLES } from "../src/api2/db"; | ||
| import { getTimestampString } from "../src/api2/utils"; | ||
| import { getCurrentUnixTimestamp } from "../src/utils/date"; | ||
|
|
||
| // storage for chain assets on the main (tvl cache) db and ddb, replacing the coins2 tables: | ||
| // - dailyChainAssets PG table: one row per chain per day, totals in `data`, token breakdowns | ||
| // in `breakdown`. Serves chart/historical reads. Backfilled by l2/cli/migrateDailyChainAssets.ts. | ||
| // - hourlyChainAssets DDB items: totals only for all chains in one item per run | ||
|
|
||
| export const hourlyChainAssetsPK = "hourlyChainAssets"; | ||
| const secondsInADay = 86400; | ||
|
|
||
| export function transformCell(chain: string, rawCell: string | null, day: number): any | null { | ||
| if (!rawCell) return null; | ||
| let cell: any; | ||
| try { | ||
| cell = JSON.parse(rawCell); | ||
| } catch { | ||
| return null; | ||
| } | ||
| if (!cell || typeof cell != "object") return null; | ||
|
|
||
| const data: { [section: string]: number } = {}; | ||
| const breakdown: { [section: string]: { [symbol: string]: number } } = {}; | ||
| Object.keys(cell).map((section: string) => { | ||
| const total = Number(cell[section]?.total); | ||
| if (isNaN(total)) return; | ||
| data[section] = total; | ||
| breakdown[section] = {}; | ||
| Object.keys(cell[section]?.breakdown ?? {}).map((symbol: string) => { | ||
| const value = Number(cell[section].breakdown[symbol]); | ||
| if (!isNaN(value)) breakdown[section][symbol] = value; | ||
| }); | ||
| }); | ||
| if (!Object.keys(data).length) return null; | ||
|
|
||
| return { id: chain, timestamp: day, timeS: getTimestampString(day), data, breakdown }; | ||
| } | ||
|
|
||
| // upserted on every run, so a day's row converges to the last run of that day | ||
| export async function storeDailyChainAssets(res: any): Promise<number> { | ||
| const timestamp = Number(res.timestamp); | ||
| if (isNaN(timestamp)) throw new Error("res.timestamp missing"); | ||
| const day = Math.floor(timestamp / secondsInADay) * secondsInADay; | ||
|
|
||
| const records: any[] = []; | ||
| Object.keys(res).map((chain: string) => { | ||
| if (chain == "timestamp") return; | ||
| const record = transformCell(chain, JSON.stringify(res[chain]), day); | ||
| if (record) records.push(record); | ||
| }); | ||
| if (!records.length) throw new Error("no daily chain assets records to store"); | ||
|
|
||
| await initializeTVLCacheDB(); | ||
| await TABLES.DAILY_CHAIN_ASSETS.bulkCreate(records, { | ||
| updateOnDuplicate: ["timestamp", "data", "breakdown", "updatedat"], | ||
| }); | ||
| return records.length; | ||
| } | ||
|
|
||
| export async function storeHourlyChainAssets(res: any): Promise<void> { | ||
| const timestamp = Number(res.timestamp); | ||
| if (isNaN(timestamp)) throw new Error("res.timestamp missing"); | ||
|
|
||
| const data: { [chain: string]: { [section: string]: number } } = {}; | ||
| Object.keys(res).map((chain: string) => { | ||
| if (chain == "timestamp") return; | ||
| const record = transformCell(chain, JSON.stringify(res[chain]), timestamp); | ||
| if (record) data[chain] = record.data; | ||
| }); | ||
| if (!Object.keys(data).length) throw new Error("no hourly chain assets data to store"); | ||
|
|
||
| await dynamodb.put({ PK: hourlyChainAssetsPK, SK: timestamp, data }); | ||
| } | ||
|
|
||
| export async function fetchDailyChainAssetsCharts(): Promise<{ | ||
| [chain: string]: { timestamp: number; data: { [section: string]: string } }[]; | ||
| }> { | ||
| await initializeTVLCacheDB(); | ||
| const rows: any[] = await TABLES.DAILY_CHAIN_ASSETS.findAll({ | ||
| attributes: ["id", "timestamp", "data"], | ||
| raw: true, | ||
| order: [["timestamp", "ASC"]], | ||
| }); | ||
|
|
||
| const charts: { [chain: string]: { timestamp: number; data: { [section: string]: string } }[] } = {}; | ||
| rows.map((row: any) => { | ||
| const totals: { [section: string]: string } = {}; | ||
| Object.keys(row.data ?? {}).map((section: string) => { | ||
| totals[section] = Number(row.data[section]).toFixed(); | ||
| }); | ||
| if (!charts[row.id]) charts[row.id] = []; | ||
| charts[row.id].push({ timestamp: Number(row.timestamp), data: totals }); | ||
| }); | ||
| return charts; | ||
| } | ||
|
|
||
| // same output shape as fetchFlows() in l2/storeToDb.ts, reading the two | ||
| // closest hourly snapshots from ddb instead of scanning the coins2 table | ||
| export async function fetchFlowsFromDDB(period: number) { | ||
| const now = getCurrentUnixTimestamp(); | ||
| const searchWidth = period / 2; | ||
| const [end, start] = await Promise.all([ | ||
| getRecordClosestToTimestamp(hourlyChainAssetsPK, now, searchWidth), | ||
| getRecordClosestToTimestamp(hourlyChainAssetsPK, now - period, searchWidth), | ||
| ]); | ||
| if (!end?.data || !start?.data) throw new Error("missing hourly chain assets snapshots in ddb"); | ||
| if (end.SK == start.SK) throw new Error("only one hourly chain assets snapshot in ddb period"); | ||
|
|
||
| const res: any = {}; | ||
| Object.keys(end.data).map((chain: string) => { | ||
| res[chain] = {}; | ||
| Object.keys(end.data[chain]).map((k: string) => { | ||
| if (!start.data[chain] || !(k in start.data[chain])) { | ||
| res[chain][k] = { perc: "0", raw: "0" }; | ||
| return; | ||
| } | ||
| const a = Number(start.data[chain][k]); | ||
| const b = Number(end.data[chain][k]); | ||
| const raw = (b - a).toFixed(); | ||
| if (a != 0 && b == 0) res[chain][k] = { perc: "-100", raw }; | ||
| else if (b == 0) res[chain][k] = { perc: "0", raw }; | ||
| else if (a == 0) res[chain][k] = { perc: "100", raw }; | ||
| else res[chain][k] = { perc: ((100 * (b - a)) / a).toFixed(2), raw }; | ||
| }); | ||
| }); | ||
|
|
||
| return res; | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,113 @@ | ||
| import postgres from "postgres"; | ||
| import { queryPostgresWithRetry } from "../../src/utils/shared/bridgedTvlPostgres"; | ||
| import { closeConnection, initializeTVLCacheDB, TABLES } from "../../src/api2/db"; | ||
| import { transformCell } from "../chainAssetsDb"; | ||
|
|
||
| // One-off migration: copies daily snapshots from the `chainassets` table into the `dailyChainAssets` table on the TVL cache db | ||
| // - Source is only read, never modified. | ||
| // - Upserts on (id, timeS), so it is idempotent and safe to re-run or resume. | ||
| // - Day selection replicates findDailyEntries() in l2/storeToDb.ts (record closest to each | ||
| // UTC midnight) so results can be diffed against the current /chain-assets/chart output. | ||
| // | ||
| // DRY_RUN=true ts-node defi/l2/cli/migrateDailyChainAssets.ts [startDay] [endDay] // days as YYYY-MM-DD | ||
|
|
||
| const secondsInADay = 86400; | ||
| const fetchChunkSize = 20; | ||
| const writeChunkSize = 500; | ||
| const dryRun = !!process.env.DRY_RUN; | ||
|
|
||
| async function iniSourceDbConnection() { | ||
| const auth = process.env.COINS2_AUTH?.split(",") ?? []; | ||
| if (!auth || auth.length != 3) throw new Error("there aren't 3 auth params"); | ||
| return postgres(auth[0], { idle_timeout: 90 }); | ||
| } | ||
|
|
||
| function parseDayArg(arg: string | undefined): number | undefined { | ||
| if (!arg) return undefined; | ||
| const ms = Date.parse(`${arg}T00:00:00Z`); | ||
| if (isNaN(ms)) throw new Error(`invalid day argument: ${arg}, expected YYYY-MM-DD`); | ||
| return ms / 1000; | ||
| } | ||
|
|
||
| // for each UTC midnight, pick the closest record; ties keep the earlier record like findDailyEntries() | ||
| export function selectDailyTimestamps( | ||
| timestamps: number[], | ||
| startDay?: number, | ||
| endDay?: number | ||
| ): { [timestamp: number]: number[] } { | ||
| const firstDay = Math.floor(timestamps[0] / secondsInADay) * secondsInADay; | ||
| const lastDay = Math.floor(timestamps[timestamps.length - 1] / secondsInADay) * secondsInADay; | ||
| const daysByTimestamp: { [timestamp: number]: number[] } = {}; | ||
| let i = 0; | ||
| for (let day = firstDay; day <= lastDay; day += secondsInADay) { | ||
| if ((startDay && day < startDay) || (endDay && day > endDay)) continue; | ||
| while (i < timestamps.length - 1 && Math.abs(timestamps[i + 1] - day) < Math.abs(timestamps[i] - day)) i++; | ||
| if (!daysByTimestamp[timestamps[i]]) daysByTimestamp[timestamps[i]] = []; | ||
| daysByTimestamp[timestamps[i]].push(day); | ||
| } | ||
| return daysByTimestamp; | ||
| } | ||
|
|
||
| async function main() { | ||
| const startDay = parseDayArg(process.argv[2]); | ||
| const endDay = parseDayArg(process.argv[3]); | ||
| const sql = await iniSourceDbConnection(); | ||
|
|
||
| try { | ||
| const timestampRows = await queryPostgresWithRetry(sql`select timestamp from chainassets`, sql); | ||
| // dedupe values: the source has no primary key | ||
| const timestamps: number[] = [ | ||
| ...new Set<number>(timestampRows.map((r: any) => Number(r.timestamp)).filter((t: number) => !isNaN(t))), | ||
| ].sort((a: number, b: number) => a - b); | ||
| if (!timestamps.length) throw new Error("no rows found in chainassets"); | ||
|
|
||
| const daysByTimestamp = selectDailyTimestamps(timestamps, startDay, endDay); | ||
|
|
||
| const selected = Object.keys(daysByTimestamp) | ||
| .map(Number) | ||
| .sort((a, b) => a - b); | ||
|
|
||
| if (!dryRun) await initializeTVLCacheDB(); | ||
|
|
||
| let written = 0; | ||
| for (let c = 0; c < selected.length; c += fetchChunkSize) { | ||
| const chunk = selected.slice(c, c + fetchChunkSize); | ||
| const rows = await queryPostgresWithRetry( | ||
| sql`select * from chainassets where timestamp in ${sql(chunk.map((t) => t.toFixed()))}`, | ||
| sql | ||
| ); | ||
|
|
||
| // source table has no primary key so duplicate timestamps can exist, dedupe on (id, timeS) | ||
| const recordMap: { [key: string]: any } = {}; | ||
| rows.map((row: any) => { | ||
| const days = daysByTimestamp[Number(row.timestamp)] ?? []; | ||
| days.map((day: number) => { | ||
| Object.keys(row).map((chain: string) => { | ||
| if (chain == "timestamp") return; | ||
| const record = transformCell(chain, row[chain], day); | ||
| if (record) recordMap[`${record.id}|${record.timeS}`] = record; | ||
| }); | ||
| }); | ||
| }); | ||
| const records = Object.values(recordMap); | ||
|
|
||
| if (dryRun) { | ||
| if (c == 0 && records.length) console.log("sample record:", JSON.stringify(records[0])); | ||
| } else { | ||
| for (let w = 0; w < records.length; w += writeChunkSize) | ||
| await TABLES.DAILY_CHAIN_ASSETS.bulkCreate(records.slice(w, w + writeChunkSize), { | ||
| updateOnDuplicate: ["timestamp", "data", "breakdown", "updatedat"], | ||
| }); | ||
| } | ||
|
|
||
| written += records.length; | ||
| } | ||
|
|
||
| console.log(`done, ${written} daily records ${dryRun ? "transformed" : "written"}`); | ||
| } finally { | ||
| sql.end(); | ||
| await closeConnection(); | ||
| } | ||
| } | ||
|
|
||
| if (require.main === module) main(); // ts-node defi/l2/cli/migrateDailyChainAssets.ts |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -2,6 +2,7 @@ import chainAssets from "../l2/tvl"; | |||||||||||||||||||||||
| import { storeR2JSONString } from "../src/utils/r2"; | ||||||||||||||||||||||||
| import { getCurrentUnixTimestamp } from "../src/utils/date"; | ||||||||||||||||||||||||
| import storeHistorical from "../l2/storeToDb"; | ||||||||||||||||||||||||
| import { storeDailyChainAssets, storeHourlyChainAssets } from "./chainAssetsDb"; | ||||||||||||||||||||||||
| import { storeChainAssetsV2 } from "./v2"; | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| export default async function storeChainAssets(override: boolean) { | ||||||||||||||||||||||||
|
|
@@ -12,6 +13,13 @@ export default async function storeChainAssets(override: boolean) { | |||||||||||||||||||||||
| await storeR2JSONString("chainAssets", JSON.stringify(res)); | ||||||||||||||||||||||||
| await storeHistorical(res); | ||||||||||||||||||||||||
| console.log("chain assets stored"); | ||||||||||||||||||||||||
| // dual-write to the new tables while readers are being moved over | ||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||
| await storeDailyChainAssets(res); | ||||||||||||||||||||||||
| await storeHourlyChainAssets(res); | ||||||||||||||||||||||||
|
Comment on lines
+17
to
+19
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sequential execution skips hourly write if daily write fails. The sequential 🔄 Proposed fix to ensure both writes are attempted // dual-write to the new tables while readers are being moved over
try {
- await storeDailyChainAssets(res);
- await storeHourlyChainAssets(res);
+ await Promise.allSettled([
+ storeDailyChainAssets(res),
+ storeHourlyChainAssets(res),
+ ]);
} catch (e) {
console.error("storing daily/hourly chain assets failed", e);
}Alternatively, use separate try-catch blocks: // dual-write to the new tables while readers are being moved over
- try {
- await storeDailyChainAssets(res);
- await storeHourlyChainAssets(res);
- } catch (e) {
- console.error("storing daily/hourly chain assets failed", e);
- }
+ try {
+ await storeDailyChainAssets(res);
+ } catch (e) {
+ console.error("storing daily chain assets failed", e);
+ }
+ try {
+ await storeHourlyChainAssets(res);
+ } catch (e) {
+ console.error("storing hourly chain assets failed", e);
+ }📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||
| } catch (e) { | ||||||||||||||||||||||||
| console.error("storing daily/hourly chain assets failed", e); | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| await storeChainAssetsV2(override); | ||||||||||||||||||||||||
| console.log("chain assets v2 stored"); | ||||||||||||||||||||||||
| process.exit(); | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incorrect handling of new chains/sections in flow calculation.
When a chain or section exists in the
endsnapshot but not instart(indicating it's new in this period), the current code returns{ perc: "0", raw: "0" }. This is incorrect because:rawshould be the actual value fromend.data[chain][k](the full value, since start is 0)percshould indicate growth, typically "100" or similar to show it's a new entryReturning zeros makes it appear there's no change when in fact a new chain/section has appeared with a non-zero value.
🐛 Proposed fix
if (!start.data[chain] || !(k in start.data[chain])) { - res[chain][k] = { perc: "0", raw: "0" }; + const b = Number(end.data[chain][k]); + res[chain][k] = { perc: b === 0 ? "0" : "100", raw: b.toFixed() }; return; }📝 Committable suggestion
🤖 Prompt for AI Agents