From f5650c3ae65b20186cfe3ffa86f0d4aaec73e606 Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Tue, 12 May 2026 01:02:07 +0200 Subject: [PATCH 1/2] feat: replace credit balance cache with per-expiration buckets Replaces the previous scalar cache + lazy-recompute design with a bucket cache keyed by (address, expiration_date), maintained eagerly by the credit_history writers under a sort-by-expiration policy. The API read path is now a single SELECT with no writes, no FIFO walk, and no cache invalidation logic. Key changes: - New credit_balances schema: (address, expiration_date, amount, last_update) with composite PK. The "no expiration" case uses PG 'infinity'::timestamptz via a psycopg2 adapter registered at connection-module import time, so the column stays NOT NULL and reads can use a uniform expiration_date > now() predicate. The INFINITY sentinel is a datetime subclass exposed through aleph.toolkit.infinity; psycopg2 already decodes PG infinity to year-9999 microsecond-999999 UTC, which compares equal under datetime semantics. - Writers (distribution, expense, transfer) update buckets atomically in the same transaction as the credit_history insert. Expenses drain the soonest-expiring non-expired bucket first under SELECT ... FOR UPDATE, serialising concurrent writers per address. - Transfers compute recipient bucket expirations by capping each consumed source bucket at min(source_expiration, requested_expiration), preserving the existing re-transfer guard. - New background CreditExpirationTask deletes expired buckets via a bounded DELETE statement. Wakes are delivered cross-process via Postgres LISTEN/NOTIFY: writers issue NOTIFY credit_expiration_changed inside their transaction (Postgres queues the payload until commit, so the wake reflects committed state), and the task holds a dedicated psycopg2 LISTEN connection whose fd is registered with the asyncio loop via loop.add_reader. This is necessary because message processing runs in spawned subprocesses (set_start_method("spawn")), so an in-process asyncio.Event would never be reachable from writers. A 1-hour idle timeout serves as defence-in-depth re-poll. - get_credit_balance / get_credit_balance_with_details are pure SELECTs: no FIFO recompute, no write-back. _apply_fifo_consumption and _calculate_credit_balance_fifo are removed. - Repair (aleph.repair.repair_node) now also bootstraps and reconciles the bucket cache from credit_history, runs on every startup, and is idempotent. Replaces the previous one-shot script approach. - Policy change: consumption now sorts by expiration_date (soonest first) rather than by message_timestamp (oldest first). This loses fewer credits to expiration when issuance order and expiration order diverge. FIFO scenario 1 in tests was updated accordingly; scenario 2 remains unchanged since both policies produce the same result there. The previous credit_balances rows are dropped by the migration; the repair function repopulates them on the next startup from credit_history, which is the source of truth. Test plan covered: - bucket helper unit tests (insert/increment, negative/under-funded consumption, sort-by-expiration drain order, sentinel handling) - writer tests (distribution buckets, expense drain order, transfer cap, whitelisted sender, recipient inherits capped expiration) - read path (sum non-expired buckets, with_details grouping, multi- address listing + count) - expiration task (delete sweep, sentinel exclusion in peek, NOTIFY emission on finite-expiration grants, no NOTIFY on sentinel grants, selector callback drains and sets the event) - repair (rebuilds buckets from history, idempotent) - existing transfer / cap / chain tests remain green under the new semantics - removed test_cache_invalidation_on_credit_expiration (no cache to invalidate); replaced with a smaller test asserting that the read query filters expired buckets directly --- ...a6_replace_credit_balances_with_buckets.py | 69 ++ src/aleph/commands.py | 8 + src/aleph/db/accessors/balances.py | 502 ++++++-------- src/aleph/db/connection.py | 6 + src/aleph/db/models/balances.py | 14 +- src/aleph/repair.py | 97 ++- src/aleph/services/credit_expiration.py | 163 +++++ src/aleph/toolkit/infinity.py | 103 +++ tests/db/test_credit_balances.py | 647 +++++++++++++++--- tests/db/test_credit_expiration_task.py | 150 ++++ .../message_processing/test_process_stores.py | 29 +- 11 files changed, 1393 insertions(+), 395 deletions(-) create mode 100644 deployment/migrations/versions/0057_b1c2d3e4f5a6_replace_credit_balances_with_buckets.py create mode 100644 src/aleph/services/credit_expiration.py create mode 100644 src/aleph/toolkit/infinity.py create mode 100644 tests/db/test_credit_expiration_task.py diff --git a/deployment/migrations/versions/0057_b1c2d3e4f5a6_replace_credit_balances_with_buckets.py b/deployment/migrations/versions/0057_b1c2d3e4f5a6_replace_credit_balances_with_buckets.py new file mode 100644 index 000000000..1ff5048a3 --- /dev/null +++ b/deployment/migrations/versions/0057_b1c2d3e4f5a6_replace_credit_balances_with_buckets.py @@ -0,0 +1,69 @@ +"""Replace credit_balances scalar cache with per-expiration buckets + +Revision ID: b1c2d3e4f5a6 +Revises: 7e5a630e4b36 +Create Date: 2026-05-12 + +The previous credit_balances table held a single FIFO-derived scalar per +address, lazily recomputed on read by an O(N^2) Python walk over +credit_history. This migration replaces it with a bucket cache keyed by +(address, expiration_date), eagerly maintained by the credit_history +writers under a sort-by-expiration policy. The "no expiration" case is +encoded as PG ``'infinity'::timestamptz`` so expiration_date stays NOT NULL +inside the composite primary key, and read queries can use a uniform +``expiration_date > now()`` predicate without IS-NULL branches. + +The previous table is dropped rather than migrated; credit_history is the +source of truth and aleph.repair.repair_credit_balances repopulates the +new table from history on the next startup. +""" + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "b1c2d3e4f5a6" +down_revision = "7e5a630e4b36" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.drop_table("credit_balances") + op.create_table( + "credit_balances", + sa.Column("address", sa.String(), nullable=False), + sa.Column("expiration_date", sa.TIMESTAMP(timezone=True), nullable=False), + sa.Column("amount", sa.BigInteger(), nullable=False), + sa.Column( + "last_update", + sa.TIMESTAMP(timezone=True), + nullable=False, + server_default=sa.func.now(), + ), + sa.PrimaryKeyConstraint( + "address", "expiration_date", name="credit_balances_pkey" + ), + ) + op.create_index( + "credit_balances_expiration_date_idx", + "credit_balances", + ["expiration_date"], + ) + + +def downgrade() -> None: + op.drop_table("credit_balances") + op.create_table( + "credit_balances", + sa.Column("address", sa.String(), nullable=False), + sa.Column("balance", sa.BigInteger(), nullable=False, server_default="0"), + sa.Column( + "last_update", + sa.TIMESTAMP(timezone=True), + nullable=False, + server_default=sa.func.now(), + ), + sa.PrimaryKeyConstraint("address", name="credit_balances_pkey"), + ) + op.create_index("ix_credit_balances_address", "credit_balances", ["address"]) diff --git a/src/aleph/commands.py b/src/aleph/commands.py index 720378b40..6a31051e7 100644 --- a/src/aleph/commands.py +++ b/src/aleph/commands.py @@ -36,6 +36,7 @@ from aleph.repair import repair_node from aleph.services import p2p from aleph.services.cache.node_cache import NodeCache +from aleph.services.credit_expiration import CreditExpirationTask from aleph.services.ipfs import IpfsService from aleph.services.keys import generate_keypair, save_keys from aleph.services.storage.fileystem_engine import FileSystemStorageEngine @@ -224,6 +225,13 @@ async def main(args: List[str]) -> None: tasks.append(cron_job_task(config=config, cron_job=cron_job)) LOGGER.debug("Initialized cron job task") + LOGGER.debug("Initializing credit expiration task") + credit_expiration = CreditExpirationTask( + session_factory=session_factory, config=config + ) + tasks.append(credit_expiration.run()) + LOGGER.debug("Initialized credit expiration task") + LOGGER.debug("Running event loop") await asyncio.gather(*tasks) diff --git a/src/aleph/db/accessors/balances.py b/src/aleph/db/accessors/balances.py index 85538a5c8..d7af3e231 100644 --- a/src/aleph/db/accessors/balances.py +++ b/src/aleph/db/accessors/balances.py @@ -9,6 +9,7 @@ from aleph_message.models import Chain from sqlalchemy import func, select, text +from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.sql import Select from aleph.db.models import AlephBalanceDb, AlephCreditBalanceDb, AlephCreditHistoryDb @@ -16,6 +17,7 @@ CREDIT_PRECISION_CUTOFF_TIMESTAMP, CREDIT_PRECISION_MULTIPLIER, ) +from aleph.toolkit.infinity import INFINITY from aleph.toolkit.timestamp import timestamp_to_datetime, utc_now from aleph.types.db_session import DbSession from aleph.types.sort_order import SortByCreditHistory, SortOrder @@ -228,268 +230,214 @@ def get_updated_balance_accounts(session: DbSession, last_update: dt.datetime): @dataclass -class PositiveCredit: - amount: int +class CreditBalanceDetail: expiration_date: Optional[dt.datetime] - timestamp: dt.datetime - remaining: int + amount: int -@dataclass -class NegativeAmount: - amount: int - timestamp: dt.datetime +def _bucket_expiration(expiration_date: Optional[dt.datetime]) -> dt.datetime: + """Map an optional API-level expiration to the DB sentinel for the bucket PK.""" + return expiration_date if expiration_date is not None else INFINITY -@dataclass -class CreditBalanceDetail: - expiration_date: Optional[dt.datetime] - amount: int +def _api_expiration(expiration_date: dt.datetime) -> Optional[dt.datetime]: + """Inverse of _bucket_expiration: convert the sentinel back to None for callers.""" + return None if expiration_date == INFINITY else expiration_date -def _apply_fifo_consumption( - session: DbSession, address: str, now: Optional[dt.datetime] = None -) -> list[PositiveCredit]: +def _apply_grant_bucket( + session: DbSession, + address: str, + amount: int, + expiration_date: Optional[dt.datetime], +) -> None: + """Add ``amount`` (positive) to the bucket for ``(address, expiration_date)``. + + Inserts the bucket if missing, increments it otherwise. ``amount`` may be + negative to undo a previous grant, but consumption of expenses/transfers + should go through ``_consume_address_credits`` so the soonest-expiring + bucket is drained first. """ - Fetch all positive credits for the address and apply all existing debits via FIFO. + bucket_exp = _bucket_expiration(expiration_date) + stmt = pg_insert(AlephCreditBalanceDb).values( + address=address, + expiration_date=bucket_exp, + amount=amount, + ) + stmt = stmt.on_conflict_do_update( + index_elements=[ + AlephCreditBalanceDb.address, + AlephCreditBalanceDb.expiration_date, + ], + set_={ + "amount": AlephCreditBalanceDb.amount + stmt.excluded.amount, + "last_update": func.now(), + }, + ) + session.execute(stmt) + + # Tell the expiration task that something may need attention. NOTIFY is + # transactional: Postgres queues the payload and only delivers it to + # LISTENers when the surrounding transaction commits, so the wake fires + # iff the bucket actually lands. We only signal for finite expirations; + # the sentinel ``infinity`` never fires. + if bucket_exp != INFINITY: + session.execute(text("NOTIFY credit_expiration_changed")) - Returns the list of PositiveCredit objects with `.remaining` reflecting how many - credits are still unconsumed after all past expenses/transfers. Expired credits are - included in the returned list — callers are responsible for filtering by expiration. + +def _consume_address_credits( + session: DbSession, + address: str, + amount: int, + now: Optional[dt.datetime] = None, +) -> List[Tuple[int, dt.datetime]]: + """Drain ``amount`` credits from the address's still-valid buckets, soonest + expiry first. Returns a list of ``(consumed_amount, source_bucket_expiration)`` + in consumption order. The bucket expiration is the DB-stored value (with + sentinel for "no expiration"); callers wanting the API-shaped value should + pass it through ``_api_expiration``. + + ``now`` is the validity cutoff: only buckets whose ``expiration_date`` is + strictly after ``now`` are eligible to be drained. Production callers pass + the expense / transfer ``message_timestamp`` here so a bucket that was + valid at the moment the message was signed remains spendable even if it + has since expired in wall-clock terms. This also keeps the eager-write + path consistent with the repair replay, which uses the historical + ``message_timestamp`` as the cutoff. ``now`` defaults to ``func.now()`` + (evaluated server-side) for callers that genuinely mean "right now". + + Bucket rows are locked ``FOR UPDATE`` to serialise concurrent writers for + the same address. Buckets are not deleted when they reach zero; the + expiration task and the next read both treat zero-amount buckets as inert. + + If ``amount`` exceeds the available credit, the leftover is silently + dropped (matches the prior FIFO behaviour). Going negative is prevented. """ - now = now if now is not None else utc_now() + if amount <= 0: + return [] - records = ( + cutoff = now if now is not None else func.now() + buckets = ( session.execute( - select(AlephCreditHistoryDb) - .where(AlephCreditHistoryDb.address == address) - .order_by(AlephCreditHistoryDb.message_timestamp.asc()) + select(AlephCreditBalanceDb) + .where( + AlephCreditBalanceDb.address == address, + AlephCreditBalanceDb.expiration_date > cutoff, + AlephCreditBalanceDb.amount > 0, + ) + .order_by(AlephCreditBalanceDb.expiration_date.asc()) + .with_for_update() ) .scalars() .all() ) - positive_credits: list[PositiveCredit] = [] - negative_amounts: list[NegativeAmount] = [] - - for record in records: - if record.amount > 0: - positive_credits.append( - PositiveCredit( - amount=record.amount, - expiration_date=record.expiration_date, - timestamp=record.message_timestamp, - remaining=record.amount, - ) - ) - else: - negative_amounts.append( - NegativeAmount( - amount=abs(record.amount), timestamp=record.message_timestamp - ) - ) - - for expense in negative_amounts: - remaining_expense = expense.amount - for credit in positive_credits: - if remaining_expense <= 0: - break - expense_valid = ( - credit.expiration_date is None - or expense.timestamp < credit.expiration_date - ) - if expense_valid and credit.remaining > 0: - consumed = min(credit.remaining, remaining_expense) - credit.remaining -= consumed - remaining_expense -= consumed - - return positive_credits + consumed_log: List[Tuple[int, dt.datetime]] = [] + remaining = amount + for bucket in buckets: + if remaining <= 0: + break + take = min(bucket.amount, remaining) + bucket.amount -= take + remaining -= take + consumed_log.append((take, bucket.expiration_date)) + session.flush() + return consumed_log def _compute_transfer_entries_by_expiration( - remaining_credits: list[PositiveCredit], - amount: int, + consumed_buckets: List[Tuple[int, dt.datetime]], requested_expiration: Optional[dt.datetime], - now: dt.datetime, -) -> list[tuple[int, Optional[dt.datetime]]]: +) -> List[Tuple[int, Optional[dt.datetime]]]: + """For each consumed source bucket, produce the matching recipient entry, + capping the recipient's expiration at the more-restrictive of source vs. + requested. Adjacent entries with the same effective expiration are merged + so a recipient never sees more granularity than necessary. + + The cap rule (``min(source, requested)``) prevents an attacker re-transferring + expiring credits with a longer requested expiration to bypass the original + expiry. Whitelisted senders skip this path entirely (see caller). """ - Simulate consuming `amount` from `remaining_credits` (FIFO order) and return a list - of (portion_amount, effective_expiration) pairs. - - Credits are consumed in the same FIFO order used by the balance calculation, so the - expiration assignment for the recipient is consistent with the sender's accounting. - - The effective expiration for each portion is: - min(source_credit.expiration_date, requested_expiration) - where None means no expiration. - - Adjacent portions with the same effective expiration are merged into one entry. - This prevents a re-transfer from extending or removing the original expiration - constraint placed on the source credits. - """ - result: list[tuple[int, Optional[dt.datetime]]] = [] - remaining_to_consume = amount - - for credit in remaining_credits: - if remaining_to_consume <= 0: - break - if credit.expiration_date is not None and credit.expiration_date <= now: - continue - if credit.remaining <= 0: - continue - - consumed = min(credit.remaining, remaining_to_consume) - remaining_to_consume -= consumed - - # Effective expiration: most restrictive of source and requested - if credit.expiration_date is not None: - effective_exp: Optional[dt.datetime] = ( - credit.expiration_date - if requested_expiration is None - else min(credit.expiration_date, requested_expiration) - ) + result: List[Tuple[int, Optional[dt.datetime]]] = [] + for consumed, source_exp in consumed_buckets: + api_source_exp = _api_expiration(source_exp) + + if api_source_exp is None: + effective_exp: Optional[dt.datetime] = requested_expiration + elif requested_expiration is None: + effective_exp = api_source_exp else: - effective_exp = requested_expiration + effective_exp = min(api_source_exp, requested_expiration) - # Merge with previous entry if same effective expiration if result and result[-1][1] == effective_exp: result[-1] = (result[-1][0] + consumed, effective_exp) else: result.append((consumed, effective_exp)) - return result -def _calculate_credit_balance_fifo( - session: DbSession, address: str, now: Optional[dt.datetime] = None -) -> int: - """ - Calculate credit balance using FIFO consumption strategy. - - This function implements the core FIFO logic: - 1. Get all positive credits (ordered by message_timestamp) - 2. Get all negative amounts (expenses/transfers) - 3. Apply negative amounts to oldest credits first, but only if the expense - occurred before the credit's expiration date - 4. Return remaining balance considering current expiration status - """ - now = now if now is not None else utc_now() - positive_credits = _apply_fifo_consumption(session, address, now) - total_balance = sum( - c.remaining - for c in positive_credits - if c.expiration_date is None or c.expiration_date > now - ) - return max(0, total_balance) - - -def get_credit_balance_with_details( - session: DbSession, address: str, now: Optional[dt.datetime] = None -) -> Tuple[int, List[CreditBalanceDetail]]: - """ - Calculate credit balance with a breakdown by expiration date. - - Returns (total_balance, details) where details is a list of - CreditBalanceDetail grouped by expiration_date, sorted with - non-expiring (None) first, then by expiration_date ascending. +def _credit_balance_amount_expr(): + """Reusable SQL expression: per-address sum of non-expired bucket amounts. - Always recalculates (bypasses cache) since details are not cached. + Uses ``func.now()`` so the cutoff is evaluated server-side at statement + execution time rather than at expression-construction time in Python. """ - now = now if now is not None else utc_now() - positive_credits = _apply_fifo_consumption(session, address, now) - - details_map: Dict[Optional[dt.datetime], int] = {} - total_balance = 0 - for credit in positive_credits: - if credit.expiration_date is None or credit.expiration_date > now: - if credit.remaining > 0: - total_balance += credit.remaining - key = credit.expiration_date - details_map[key] = details_map.get(key, 0) + credit.remaining - - # Sort: non-expiring first (None), then by expiration_date ascending - details = [ - CreditBalanceDetail(expiration_date=k, amount=v) - for k, v in sorted( - details_map.items(), - key=lambda x: (x[0] is not None, x[0] or dt.datetime.min), - ) - ] - - return max(0, total_balance), details + return func.coalesce( + func.sum(AlephCreditBalanceDb.amount).filter( + AlephCreditBalanceDb.expiration_date > func.now() + ), + 0, + ) def get_credit_balance( session: DbSession, address: str, now: Optional[dt.datetime] = None ) -> int: - """ - Get credit balance using lazy recalculation strategy. + """Sum of non-expired bucket amounts for ``address``, floored at zero. - 1. Check if cached balance exists in credit_balances table - 2. Check if credit_history has newer entries than cached balance - 3. Check if any credits have expiration dates that occurred after the cache's last update - 4. If recalculation is needed, recalculate using FIFO and update cache - 5. Return cached balance + Pure read: no FIFO walk, no write-back. The bucket cache is maintained by + the credit_history writers and by the expiration task. """ - - now = now if now is not None else utc_now() - - # Get the timestamp of the most recent credit history entry for this address - latest_history_timestamp = session.execute( - select(func.max(AlephCreditHistoryDb.last_update)).where( - AlephCreditHistoryDb.address == address + cutoff = now if now is not None else func.now() + result = session.execute( + select(func.coalesce(func.sum(AlephCreditBalanceDb.amount), 0)).where( + AlephCreditBalanceDb.address == address, + AlephCreditBalanceDb.expiration_date > cutoff, ) ).scalar() + return max(0, int(result or 0)) - # If no history exists, balance is 0 - if latest_history_timestamp is None: - return 0 - - # Get cached balance if it exists - cached_balance = session.execute( - select(AlephCreditBalanceDb).where(AlephCreditBalanceDb.address == address) - ).scalar_one_or_none() - - # Check if recalculation is needed - needs_recalculation = ( - cached_balance is None or cached_balance.last_update < latest_history_timestamp - ) - - # Also check if any credits have expiration dates that occurred after the cache's last update - # This handles the case where credits expired since the last cache update - if not needs_recalculation and cached_balance is not None: - # Check for any credits with expiration dates between cache last_update and now - earliest_expiration_after_cache = session.execute( - select(func.min(AlephCreditHistoryDb.expiration_date)).where( - (AlephCreditHistoryDb.address == address) - & (AlephCreditHistoryDb.expiration_date.isnot(None)) - & (AlephCreditHistoryDb.expiration_date > cached_balance.last_update) - & (AlephCreditHistoryDb.expiration_date <= now) - ) - ).scalar() - - needs_recalculation = earliest_expiration_after_cache is not None - if needs_recalculation: - # Recalculate balance using FIFO - new_balance = _calculate_credit_balance_fifo(session, address, now) +def get_credit_balance_with_details( + session: DbSession, address: str, now: Optional[dt.datetime] = None +) -> Tuple[int, List[CreditBalanceDetail]]: + """Per-expiration-date breakdown of an address's non-expired credits. - if cached_balance is None: - # Create new cache entry - session.add( - AlephCreditBalanceDb( - address=address, balance=new_balance, last_update=now - ) - ) - else: - # Update existing cache entry - cached_balance.balance = new_balance - cached_balance.last_update = now + Returns ``(total, details)`` where details are sorted with the + non-expiring bucket first (sentinel mapped back to ``None``), then by + expiration ascending. Zero-amount buckets are filtered out. + """ + cutoff = now if now is not None else func.now() + rows = session.execute( + select(AlephCreditBalanceDb.expiration_date, AlephCreditBalanceDb.amount).where( + AlephCreditBalanceDb.address == address, + AlephCreditBalanceDb.expiration_date > cutoff, + AlephCreditBalanceDb.amount > 0, + ) + ).all() - session.flush() - return new_balance + pairs = [(row.expiration_date, int(row.amount)) for row in rows] + total = max(0, sum(amount for _, amount in pairs)) - return cached_balance.balance if cached_balance else 0 + details = [ + CreditBalanceDetail(expiration_date=_api_expiration(exp), amount=amount) + for exp, amount in sorted( + pairs, + # Sentinel (no expiration) first, then by expiration ascending. + key=lambda x: (x[0] != INFINITY, x[0]), + ) + ] + return total, details def get_credit_balances( @@ -500,16 +448,17 @@ def get_credit_balances( after_address: Optional[str] = None, cursor_mode: bool = False, ) -> list[tuple[str, int]]: + """Paginated list of (address, balance) across all addresses with a + positive non-expired sum. """ - Get paginated credit balances for all addresses. - Uses the cached balances from the credit_balances table. - """ - query = select(AlephCreditBalanceDb.address, AlephCreditBalanceDb.balance) + balance_expr = _credit_balance_amount_expr().label("balance") - if min_balance > 0: - query = query.filter(AlephCreditBalanceDb.balance >= min_balance) - - query = query.order_by(AlephCreditBalanceDb.address.asc()) + query = ( + select(AlephCreditBalanceDb.address, balance_expr) + .group_by(AlephCreditBalanceDb.address) + .having(balance_expr >= max(min_balance, 1)) + .order_by(AlephCreditBalanceDb.address.asc()) + ) if after_address is not None: query = query.where(AlephCreditBalanceDb.address > after_address) @@ -522,22 +471,20 @@ def get_credit_balances( if pagination: query = query.limit(pagination) - # Return results in the expected format (address, credits) - results = session.execute(query).all() - return [(row.address, row.balance) for row in results] + return [(row.address, int(row.balance)) for row in session.execute(query).all()] def count_credit_balances(session: DbSession, min_balance: int = 0) -> int: - """ - Count addresses with credit balances. - Uses the cached balances from the credit_balances table. - """ - query = select(func.count(AlephCreditBalanceDb.address)) - - if min_balance > 0: - query = query.filter(AlephCreditBalanceDb.balance >= min_balance) - - return session.execute(query).scalar_one() + """Count of addresses with a positive non-expired sum (or matching + ``min_balance``).""" + balance_expr = _credit_balance_amount_expr().label("balance") + sub = ( + select(AlephCreditBalanceDb.address) + .group_by(AlephCreditBalanceDb.address) + .having(balance_expr >= max(min_balance, 1)) + .subquery() + ) + return session.execute(select(func.count()).select_from(sub)).scalar_one() def _format_csv_row(*fields) -> str: @@ -624,11 +571,8 @@ def update_credit_balances_distribution( message_hash: str, message_timestamp: dt.datetime, ) -> None: - """ - Updates credit balances for distribution messages (aleph_credit_distribution). - - Distribution messages include all fields like price, bonus_amount, tx_hash, provider, - payment_method, token, chain, and expiration_date. + """Apply a distribution message: grant each entry's bucket and append the + matching credit_history rows. """ last_update = utc_now() @@ -642,20 +586,20 @@ def update_credit_balances_distribution( tx_hash = credit_entry["tx_hash"] provider = credit_entry["provider"] - # Extract optional fields from each credit entry expiration_timestamp = credit_entry.get("expiration") or None origin = credit_entry.get("origin", "") origin_ref = credit_entry.get("ref", "") payment_method = credit_entry.get("payment_method", "") bonus_amount = credit_entry.get("bonus_amount", "") - # Convert expiration timestamp to datetime expiration_date = ( dt.datetime.fromtimestamp(expiration_timestamp / 1000, tz=dt.timezone.utc) if expiration_timestamp is not None else None ) + _apply_grant_bucket(session, address, amount, expiration_date) + csv_rows.append( _format_csv_row( address, @@ -686,15 +630,12 @@ def update_credit_balances_expense( message_hash: str, message_timestamp: dt.datetime, ) -> None: - """ - Updates credit balances for expense messages (aleph_credit_expense). - - Expense messages have negative amounts and can include: - - execution_id (mapped to origin) - - node_id (mapped to tx_hash) - - price (mapped to price) - - time (skipped for now) - - ref (mapped to origin_ref) + """Apply an expense message: drain each entry's amount from the soonest- + expiring non-expired buckets, then append the matching credit_history rows. + + The history row's negative ``amount`` reflects the message intent. If the + address is under-funded, fewer credits are actually consumed (buckets cannot + go negative), matching the prior FIFO behaviour. """ last_update = utc_now() @@ -703,19 +644,18 @@ def update_credit_balances_expense( for index, credit_entry in enumerate(credits_list): address = credit_entry["address"] raw_amount = int(credit_entry["amount"]) - amount = -_apply_credit_precision_multiplier(raw_amount, message_timestamp) + amount = _apply_credit_precision_multiplier(raw_amount, message_timestamp) origin_ref = credit_entry.get("ref", "") - - # Map new fields origin = credit_entry.get("execution_id", "") tx_hash = credit_entry.get("node_id", "") price = credit_entry.get("price", "") - # Skip time field for now + + _consume_address_credits(session, address, amount, now=message_timestamp) csv_rows.append( _format_csv_row( address, - amount, + -amount, message_hash, index, message_timestamp, @@ -744,21 +684,13 @@ def update_credit_balances_transfer( message_hash: str, message_timestamp: dt.datetime, ) -> None: - """ - Updates credit balances for transfer messages (aleph_credit_transfer). - - Transfer messages involve two entries per transfer: - - One or more positive entries for the recipient (adding credits) - - One negative entry for the sender (subtracting credits) - - When a non-whitelisted sender re-transfers credits they received with an expiration - date, the recipient's credits are capped at the original expiration — preventing - bypass of expiration constraints through re-transfers. If the sender's credits have - mixed expirations, multiple positive entries are created for the recipient (one per - expiration group). + """Apply a transfer message: drain the sender's buckets (soonest-expiring + first), grant the resulting amounts to the recipient(s) with each portion + capped at ``min(source_expiration, requested_expiration)``, and append the + matching credit_history rows. - Special case: If sender is in the whitelisted addresses, only add credits to recipient - using the requested expiration as-is (whitelisted senders create credits from nothing). + Whitelisted senders create credits from nothing: the sender is not debited + and the recipient is granted ``amount`` with the requested expiration as-is. """ last_update = utc_now() @@ -766,18 +698,12 @@ def update_credit_balances_transfer( index = 0 is_whitelisted = sender_address in whitelisted_addresses - # Compute sender's remaining credits once for all entries in this transfer - sender_remaining: list[PositiveCredit] = [] - if not is_whitelisted: - sender_remaining = _apply_fifo_consumption(session, sender_address, last_update) - for credit_entry in credits_list: recipient_address = credit_entry["address"] raw_amount = int(credit_entry["amount"]) amount = _apply_credit_precision_multiplier(raw_amount, message_timestamp) expiration_timestamp = credit_entry.get("expiration") or None - # Convert expiration timestamp to datetime requested_expiration = ( dt.datetime.fromtimestamp(expiration_timestamp / 1000, tz=dt.timezone.utc) if expiration_timestamp is not None @@ -785,21 +711,29 @@ def update_credit_balances_transfer( ) if is_whitelisted: - # Whitelisted senders are not constrained by source credits - entries: list[tuple[int, Optional[dt.datetime]]] = [ + entries: List[Tuple[int, Optional[dt.datetime]]] = [ (amount, requested_expiration) ] else: + consumed = _consume_address_credits( + session, sender_address, amount, now=message_timestamp + ) entries = _compute_transfer_entries_by_expiration( - sender_remaining, amount, requested_expiration, last_update + consumed, requested_expiration ) - # Fallback for edge cases where sender credits are not tracked - # (e.g. whitelisted distributions not recorded in history) + # Production transfers are gated by validate_credit_transfer_balance, + # so consumed should sum to ``amount``. Fall back to a single + # ``(amount, requested_expiration)`` entry whenever it doesn't (under- + # funded test scenarios or zero-amount transfers) so the recipient + # still receives a history row matching the message intent. The + # bucket grant below is a no-op for zero-amount entries. if not entries: entries = [(amount, requested_expiration)] - # Add positive entries for recipient (one per expiration group) for entry_amount, entry_expiration in entries: + _apply_grant_bucket( + session, recipient_address, entry_amount, entry_expiration + ) csv_rows.append( _format_csv_row( recipient_address, @@ -822,8 +756,6 @@ def update_credit_balances_transfer( ) index += 1 - # Add negative entry for sender (unless sender is in whitelisted addresses) - # (origin = recipient, provider = ALEPH, payment_method = credit_transfer) if not is_whitelisted: csv_rows.append( _format_csv_row( diff --git a/src/aleph/db/connection.py b/src/aleph/db/connection.py index c42921869..47f57e77e 100644 --- a/src/aleph/db/connection.py +++ b/src/aleph/db/connection.py @@ -7,8 +7,14 @@ from sqlalchemy.orm import sessionmaker from aleph.config import get_config +from aleph.toolkit.infinity import register_infinity_adapter from aleph.types.db_session import DbSessionFactory +# Make psycopg2 emit PG ``infinity::timestamptz`` for the credit-balance +# expiration sentinel. The adapter is global to psycopg2, so registering once +# at module import time covers every connection opened later in this process. +register_infinity_adapter() + def make_db_url( driver: str, config: Config, application_name: Optional[str] = None diff --git a/src/aleph/db/models/balances.py b/src/aleph/db/models/balances.py index c4ef27d6b..b7e6b6056 100644 --- a/src/aleph/db/models/balances.py +++ b/src/aleph/db/models/balances.py @@ -70,10 +70,20 @@ class AlephCreditHistoryDb(Base): class AlephCreditBalanceDb(Base): + """Eagerly-maintained credit balance cache, one row per (address, expiration_date). + + For credits without an expiration, ``aleph.toolkit.infinity.INFINITY`` + (PG ``'infinity'::timestamptz``) is used so the primary key columns can + stay NOT NULL while reads still treat the bucket as never expiring. + """ + __tablename__ = "credit_balances" - address: Mapped[str] = mapped_column(String, primary_key=True, index=True) - balance: Mapped[int] = mapped_column(BigInteger, nullable=False, default=0) + address: Mapped[str] = mapped_column(String, primary_key=True) + expiration_date: Mapped[dt.datetime] = mapped_column( + TIMESTAMP(timezone=True), primary_key=True + ) + amount: Mapped[int] = mapped_column(BigInteger, nullable=False, default=0) last_update: Mapped[dt.datetime] = mapped_column( TIMESTAMP(timezone=True), nullable=False, diff --git a/src/aleph/repair.py b/src/aleph/repair.py index 2f939140a..2569f3611 100644 --- a/src/aleph/repair.py +++ b/src/aleph/repair.py @@ -1,11 +1,14 @@ import logging from aleph_message.models import ItemHash -from sqlalchemy import select +from sqlalchemy import delete, select +from sqlalchemy.dialects.postgresql import insert as pg_insert from aleph.db.accessors.files import upsert_file -from aleph.db.models import StoredFileDb +from aleph.db.models import AlephCreditBalanceDb, AlephCreditHistoryDb, StoredFileDb from aleph.storage import StorageService +from aleph.toolkit.infinity import INFINITY +from aleph.toolkit.timestamp import utc_now from aleph.types.db_session import DbSession, DbSessionFactory LOGGER = logging.getLogger(__name__) @@ -46,6 +49,93 @@ async def _fix_file_sizes( ) +def _rebuild_credit_buckets_for_address(session: DbSession, address: str) -> None: + """Replay ``credit_history`` chronologically under the sort-by-expiration + policy and replace this address's buckets with the resulting state. + + Idempotent: clears existing buckets for the address first, then rebuilds. + Safe to interrupt at address granularity (callers commit per-address). + """ + session.execute( + delete(AlephCreditBalanceDb).where(AlephCreditBalanceDb.address == address) + ) + + records = ( + session.execute( + select(AlephCreditHistoryDb) + .where(AlephCreditHistoryDb.address == address) + .order_by( + AlephCreditHistoryDb.message_timestamp.asc(), + AlephCreditHistoryDb.credit_ref.asc(), + AlephCreditHistoryDb.credit_index.asc(), + ) + ) + .scalars() + .all() + ) + + # Bucket state in memory: {expiration_date (sentinel for None): amount}. + buckets: dict = {} + + def _bucket_key(expiration): + return expiration if expiration is not None else INFINITY + + for record in records: + if record.amount > 0: + key = _bucket_key(record.expiration_date) + buckets[key] = buckets.get(key, 0) + record.amount + else: + remaining = -int(record.amount) + # Soonest-expiring first, ignoring buckets that have already expired + # at the moment of the historical expense. + keys_in_order = sorted(buckets.keys()) + for key in keys_in_order: + if remaining <= 0: + break + if key <= record.message_timestamp: + continue + available = buckets[key] + if available <= 0: + continue + take = min(available, remaining) + buckets[key] = available - take + remaining -= take + + now = utc_now() + rows = [ + {"address": address, "expiration_date": key, "amount": amount} + for key, amount in buckets.items() + if amount > 0 and key > now + ] + if rows: + session.execute(pg_insert(AlephCreditBalanceDb).values(rows)) + + +def _repair_credit_balances(session_factory: DbSessionFactory) -> None: + """Bootstrap or repair the credit_balances bucket cache from credit_history. + + Rebuilds buckets for every address that has any credit_history rows. This is + idempotent and runs on every startup; after the initial bootstrap it's a + bounded full-table scan plus per-address rebuild, which is acceptable given + typical address counts. + """ + with session_factory() as session: + addresses = list( + session.execute(select(AlephCreditHistoryDb.address).distinct()).scalars() + ) + + LOGGER.info("Repairing credit_balances for %d address(es)", len(addresses)) + + for i, address in enumerate(addresses): + with session_factory() as session: + _rebuild_credit_buckets_for_address(session, address) + session.commit() + if (i + 1) % 500 == 0: + LOGGER.info("Repaired %d / %d", i + 1, len(addresses)) + + LOGGER.info("Credit balances repair complete (%d address(es))", len(addresses)) + + async def repair_node( storage_service: StorageService, session_factory: DbSessionFactory ): @@ -53,3 +143,6 @@ async def repair_node( with session_factory() as session: await _fix_file_sizes(session, storage_service, store_files=True) session.commit() + + LOGGER.info("Repairing credit balances") + _repair_credit_balances(session_factory) diff --git a/src/aleph/services/credit_expiration.py b/src/aleph/services/credit_expiration.py new file mode 100644 index 000000000..a3f3ca8ee --- /dev/null +++ b/src/aleph/services/credit_expiration.py @@ -0,0 +1,163 @@ +"""Background task that drops expired credit_balances bucket rows. + +Replaces the read-path FIFO walk that previously filtered expired credits +on every balance read. The task sleeps until the next expiration timestamp +present in ``credit_balances`` and is woken via a Postgres ``LISTEN`` on +``credit_expiration_changed``. Writers in any worker process ``NOTIFY`` +that channel inside their transaction; Postgres delivers the notification +after commit so the wake reflects committed state. + +Single instance per CCN process (the main process — message processing +runs in spawned subprocesses, so the in-process ``asyncio.Event`` is *not* +shared across them; the database is the rendezvous point). +""" + +import asyncio +import logging +from typing import Optional + +import psycopg2 +import psycopg2.extensions +from configmanager import Config +from sqlalchemy import delete, func, select + +from aleph.db.connection import make_db_url +from aleph.db.models import AlephCreditBalanceDb +from aleph.toolkit.infinity import INFINITY +from aleph.toolkit.timestamp import utc_now +from aleph.types.db_session import DbSessionFactory + +LOGGER = logging.getLogger(__name__) + +NOTIFY_CHANNEL = "credit_expiration_changed" + +# Fallback timeout when no expiration is pending. Long enough to be cheap, +# short enough that a missed NOTIFY (e.g. transient driver glitch) recovers +# in bounded time. The NOTIFY path is the primary wake mechanism. +_IDLE_TIMEOUT_SECONDS = 60 * 60 # 1 hour + + +class CreditExpirationTask: + """Single long-running task that deletes expired bucket rows. + + Holds a dedicated psycopg2 connection in autocommit mode for the LISTEN + side (separate from the SQLAlchemy pool so the LISTEN doesn't tie up a + regular connection). The connection's fd is registered with the event + loop via ``loop.add_reader``: when Postgres pushes a notification, the + reader callback drains ``connection.notifies`` and sets the local event. + + Each iteration: + 1. ``DELETE FROM credit_balances WHERE expiration_date <= now()``. + 2. ``SELECT MIN(expiration_date)`` excluding the ``infinity`` sentinel. + 3. ``asyncio.wait_for(event.wait(), timeout=delta)`` if there's a + finite expiration, else unbounded ``event.wait()`` until a NOTIFY. + """ + + def __init__(self, session_factory: DbSessionFactory, config: Config) -> None: + self.session_factory = session_factory + self._config = config + self._event = asyncio.Event() + self._listen_conn: Optional[psycopg2.extensions.connection] = None + self._fd: Optional[int] = None + + async def run(self) -> None: + loop = asyncio.get_event_loop() + self._listen_conn = self._open_listen_connection() + self._fd = self._listen_conn.fileno() + loop.add_reader(self._fd, self._on_listen_readable) + LOGGER.info("Credit expiration task started (LISTEN %s)", NOTIFY_CHANNEL) + try: + while True: + await self._tick() + except asyncio.CancelledError: + LOGGER.info("Credit expiration task cancelled") + raise + finally: + if self._fd is not None: + loop.remove_reader(self._fd) + if self._listen_conn is not None: + self._listen_conn.close() + + def _open_listen_connection(self) -> psycopg2.extensions.connection: + dsn = make_db_url( + driver="psycopg2", + config=self._config, + application_name="credit-expiration-listen", + ) + # SQLAlchemy URL is psycopg2-compatible after stripping the dialect prefix. + prefix = "postgresql+psycopg2://" + if dsn.startswith(prefix): + dsn = "postgresql://" + dsn[len(prefix) :] + conn = psycopg2.connect(dsn) + conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) + cur = conn.cursor() + cur.execute(f"LISTEN {NOTIFY_CHANNEL};") + cur.close() + return conn + + def _on_listen_readable(self) -> None: + """Selector callback: drain pending notifications and wake the task.""" + conn = self._listen_conn + if conn is None: + return + try: + conn.poll() + except Exception: + LOGGER.exception("LISTEN connection poll failed") + return + if conn.notifies: + conn.notifies.clear() + self._event.set() + + async def _tick(self) -> None: + self._delete_expired() + + next_exp = self._peek_next_expiration() + if next_exp is None: + # Nothing pending: wait until a writer NOTIFY-s or the idle + # timeout fires (defence-in-depth re-poll). + try: + await asyncio.wait_for( + self._event.wait(), timeout=_IDLE_TIMEOUT_SECONDS + ) + except asyncio.TimeoutError: + pass + finally: + self._event.clear() + return + + timeout = (next_exp - utc_now()).total_seconds() + if timeout <= 0: + # Already past the next expiration: re-sweep immediately. + return + + try: + await asyncio.wait_for(self._event.wait(), timeout=timeout) + except asyncio.TimeoutError: + pass + finally: + self._event.clear() + + def _peek_next_expiration(self): + with self.session_factory() as session: + return session.execute( + select(func.min(AlephCreditBalanceDb.expiration_date)).where( + AlephCreditBalanceDb.expiration_date > utc_now(), + AlephCreditBalanceDb.expiration_date < INFINITY, + AlephCreditBalanceDb.amount > 0, + ) + ).scalar() + + def _delete_expired(self) -> None: + with self.session_factory() as session: + result = session.execute( + delete(AlephCreditBalanceDb).where( + AlephCreditBalanceDb.expiration_date <= utc_now() + ) + ) + session.commit() + # session.execute(delete(...)) returns a CursorResult at runtime; + # the static Result alias hides ``rowcount`` so cast for mypy. + rowcount = getattr(result, "rowcount", 0) + if rowcount: + LOGGER.info("Dropped %d expired credit bucket(s)", rowcount) diff --git a/src/aleph/toolkit/infinity.py b/src/aleph/toolkit/infinity.py new file mode 100644 index 000000000..fedef9d54 --- /dev/null +++ b/src/aleph/toolkit/infinity.py @@ -0,0 +1,103 @@ +"""``INFINITY`` sentinel for the credit_balances expiration column. + +The bucket cache models "no expiration" as a real timestamptz with PG's +``infinity`` value, not NULL. That keeps the composite primary key columns +NOT NULL and lets every read query use the same ``expiration_date > NOW()`` +shape without an ``IS NULL`` branch. + +The Python side uses a ``datetime`` subclass so we can: + + * register a psycopg2 adapter that intercepts just this sentinel and + emits ``'infinity'::timestamptz`` on the wire; regular ``datetime`` + values keep the default adapter; and + * register a typecaster that decodes PG ``infinity::timestamptz`` back to + an ``_Infinity`` instance (rather than a plain ``datetime(9999, ...)``) + so subsequent ORM round-trips serialise it as PG ``infinity`` again. + Without the typecaster the ORM would track a regular datetime for the + PK and re-serialise it as a year-9999 literal, which is *not* equal to + PG ``infinity`` at the SQL level and would cause UPDATE WHERE PK to + miss the row (StaleDataError). + +Round-trip assumption: SQLAlchemy's ``DateTime`` result processor for the +psycopg2 dialect is a passthrough, so the ``_Infinity`` subclass produced +by our typecaster reaches the ORM unchanged and the outbound adapter sees +``_Infinity`` (not a normalised ``datetime``) on the UPDATE path. If a +future SQLAlchemy version starts coercing timestamptz values during result +processing, the symptom would be loud (StaleDataError on bucket UPDATE), +not silent corruption; the fallback would be to expose this sentinel via a +custom ``TypeDecorator`` that coerces year-9999/us-999999 values back to +``_Infinity`` on load. +""" + +import datetime as dt + +import psycopg2.extensions + +_TIMESTAMPTZ_OID = 1184 # pg_type.oid for timestamptz + + +class _Infinity(dt.datetime): + """``datetime`` subclass whose only canonical instance is ``INFINITY``. + + Subclassing ``datetime`` lets the value participate in normal comparison + and arithmetic with other datetimes (it sorts after every representable + timestamptz), while remaining a distinct type for adapter dispatch. + """ + + __slots__ = () + + def __new__(cls) -> "_Infinity": + # Matches what psycopg2 returns when decoding PG ``infinity::timestamptz`` + # so reads from the DB compare equal to this sentinel via datetime equality. + return dt.datetime.__new__( + cls, 9999, 12, 31, 23, 59, 59, 999999, tzinfo=dt.timezone.utc + ) + + def __repr__(self) -> str: + return "INFINITY" + + +INFINITY: _Infinity = _Infinity() + + +def _adapt_infinity(_value: _Infinity) -> psycopg2.extensions.AsIs: + return psycopg2.extensions.AsIs("'infinity'::timestamptz") + + +_DEFAULT_TIMESTAMPTZ = psycopg2.extensions.PYDATETIMETZ + + +def _cast_timestamptz(value, cur): + """Replacement typecaster for OID 1184 that decodes PG ``infinity`` to ``INFINITY``.""" + if value == "infinity": + return INFINITY + return _DEFAULT_TIMESTAMPTZ(value, cur) + + +_INFINITY_AWARE_TIMESTAMPTZ = psycopg2.extensions.new_type( + (_TIMESTAMPTZ_OID,), + "TIMESTAMPTZ_INFINITY", + _cast_timestamptz, +) + + +_registered = False + + +def register_infinity_adapter() -> None: + """Register psycopg2 to round-trip ``INFINITY`` <-> PG ``infinity::timestamptz``. + + * Outbound: ``_Infinity`` instances serialise to ``'infinity'::timestamptz``. + * Inbound: ``timestamptz infinity`` values decode to ``INFINITY`` (the + ``_Infinity`` singleton), so the ORM tracks a value that re-serialises + as PG ``infinity`` on subsequent UPDATE statements. + + Idempotent. Safe to call at module-import time before any connections + are opened; adapters and typecasters are global to the psycopg2 module. + """ + global _registered + if _registered: + return + psycopg2.extensions.register_adapter(_Infinity, _adapt_infinity) + psycopg2.extensions.register_type(_INFINITY_AWARE_TIMESTAMPTZ) + _registered = True diff --git a/tests/db/test_credit_balances.py b/tests/db/test_credit_balances.py index 8cbffe840..97582a0b1 100644 --- a/tests/db/test_credit_balances.py +++ b/tests/db/test_credit_balances.py @@ -3,7 +3,7 @@ from decimal import Decimal from typing import Any, Dict, List -from sqlalchemy import select +from sqlalchemy import delete, select from sqlalchemy import update as sql_update from aleph.db.accessors.balances import ( @@ -618,19 +618,24 @@ def test_balance_fix_doesnt_affect_valid_credits(session_factory: DbSessionFacto assert balance > 0 -def test_fifo_scenario_1_non_expiring_first_equals_0_remaining( +def test_fifo_scenario_1_non_expiring_first_equals_500_remaining( session_factory: DbSessionFactory, ): """ - FIFO Scenario 1: Non-expiring credits received FIRST → Result: 0 remaining + Sort-by-expiration Scenario 1: non-expiring grant FIRST, expiring grant SECOND. Setup: - 1000 non-expiring credits (received FIRST at T1) - 1000 expiring credits (received SECOND at T2, expire at T4) - 1500 expense at T3 (before expiration at T4) - FIFO Consumption: 1000 (non-expiring) + 500 (expiring) = 1500 total consumed - Final Balance: 0 (non-expiring remaining) + 500 (expiring remaining but expired) = 0 + Sort-by-expiration consumption: the expiring bucket has the earlier + expiration_date and is drained first (1000), then 500 from the non-expiring + bucket. After expiration the expiring bucket is dropped by the cron, so the + final balance is the 500 surviving in the non-expiring bucket. + + Under the previous sort-by-issuance FIFO this test asserted 0; the policy + change makes the user lose fewer credits to expiration. """ # Use fixed timestamps before the credit precision cutoff (2026-02-02) @@ -721,8 +726,9 @@ def test_fifo_scenario_1_non_expiring_first_equals_0_remaining( session, "0xcorner_case_user", now_time ) - # Expected: 0 remaining (all non-expiring consumed, expiring remainder expired) - expected_balance = 0 + # Expected: 500 * 10000 remaining in the non-expiring bucket, because + # the expiring bucket was drained first under sort-by-expiration. + expected_balance = 500 * 10000 assert ( balance_after_expiration == expected_balance ), f"Scenario 1: Expected {expected_balance} remaining credits, got {balance_after_expiration}" @@ -838,97 +844,39 @@ def test_fifo_scenario_2_expiring_first_equals_500_remaining( ), f"Scenario 2: Expected {expected_balance} remaining credits, got {balance_after_expiration}" -def test_cache_invalidation_on_credit_expiration(session_factory: DbSessionFactory): +def test_expired_credits_filtered_out_by_read(session_factory: DbSessionFactory): + """A bucket whose ``expiration_date`` is in the past is not counted by + ``get_credit_balance``. This replaces the prior cache-invalidation test: + there is no lazy recompute; the read query just filters by + ``expiration_date > now()``. """ - Test that cached balances are recalculated when credits expire. - - This test covers the bug where cached balances were not being recalculated - when credits expired after the cache was last updated. - - Bug scenario: - 1. Credit with expiration date X is added at time T1 - 2. Cache is calculated at time T2 (where T1 < T2 < X) - 3. Current time is T3 (where T2 < X < T3, so credit has expired) - 4. Without the fix, cached balance would be returned incorrectly - 5. With the fix, balance is recalculated because credit expired after cache update - """ - - # Use fixed timestamps before the credit precision cutoff (2026-02-02) - # to ensure the 10000x multiplier is applied consistently - # Base time (T3): 2023-06-15 12:00:00 UTC - base_time = 1686830400 - - # Time T1: Add credit (1 hour before base_time) + base_time = 1686830400 # 2023-06-15 12:00:00 UTC (pre precision cutoff) credit_time = dt.datetime.fromtimestamp(base_time - 3600, tz=dt.timezone.utc) - - # Time T2: Cache calculation time (30 minutes before base_time, before expiration) - cache_time = dt.datetime.fromtimestamp(base_time - 1800, tz=dt.timezone.utc) - - # Time X: Credit expiration (5 minutes before base_time, between cache time and T3) - expiration_time = int((base_time - 300) * 1000) - - # Time T3: Current time for final balance check (after expiration) + expiration_time_ms = int((base_time - 300) * 1000) # expired 5 minutes before now now_time = dt.datetime.fromtimestamp(base_time, tz=dt.timezone.utc) with session_factory() as session: - # Step 1: Add credit with expiration date - credits_list = [ - { - "address": "0xcache_bug_user", - "amount": 1000, - "price": "1.0", - "tx_hash": "0xcache_test", - "provider": "test_provider", - "expiration": expiration_time, # Will expire at T3 - } - ] - update_credit_balances_distribution( session=session, - credits_list=credits_list, + credits_list=[ + { + "address": "0xexpired_user", + "amount": 1000, + "price": "1.0", + "tx_hash": "0xexpired", + "provider": "test_provider", + "expiration": expiration_time_ms, + } + ], token="TEST", chain="ETH", - message_hash="cache_expiration_test", - message_timestamp=credit_time, # T1 - ) - session.commit() - - # Step 2: Simulate cache being calculated at T2 (before expiration) - # Mock utc_now to return cache_time during first balance calculation - balance_before_expiration = get_credit_balance( - session, "0xcache_bug_user", cache_time + message_hash="expired_credits_test", + message_timestamp=credit_time, ) session.commit() - # Verify that at T2, the balance was 10000000 (1000 * 10000 multiplier, credit not yet expired) - assert balance_before_expiration == 10000000 - - # Verify that a cache entry was created and manually update its timestamp - # to simulate it being created at T2 (cache_time) - - cached_balance = session.execute( - select(AlephCreditBalanceDb).where( - AlephCreditBalanceDb.address == "0xcache_bug_user" - ) - ).scalar_one_or_none() - - assert cached_balance is not None - assert cached_balance.balance == 10000000 - assert cached_balance.last_update == cache_time - - # Step 3: Now check balance at current time (T3, after expiration) - # The fix should detect that credit expired after cache update and recalculate - balance_after_expiration = get_credit_balance( - session, "0xcache_bug_user", now_time - ) - - # Expected: 0 (credit has expired) - assert balance_after_expiration == 0 - - # Verify that cache was updated (should have a newer timestamp) - session.refresh(cached_balance) - assert cached_balance.balance == 0 - assert cached_balance.last_update == now_time + # Bucket row exists with the past expiration, but the read filters it out. + assert get_credit_balance(session, "0xexpired_user", now_time) == 0 def test_get_resource_consumed_credits_no_records(session_factory: DbSessionFactory): @@ -1550,9 +1498,35 @@ def test_chain_transfer_a_b_c_expiration_and_balances( def _insert_credit_history_entries(session, entries: List[Dict[str, Any]]): - """Helper to bulk-insert credit history rows for testing.""" + """Helper to bulk-insert credit history rows for testing. + + Also seeds the bucket cache so ``get_credit_balance(_with_details)`` see + the credits without going through the message writers. Positive amounts + add to a bucket at the entry's expiration_date; negative amounts drain + soonest-first from existing buckets, matching the writer semantics. + """ + from aleph.db.accessors.balances import ( + _apply_grant_bucket, + _consume_address_credits, + ) + for entry in entries: session.add(AlephCreditHistoryDb(**entry)) + amount = int(entry["amount"]) + if amount > 0: + _apply_grant_bucket( + session, entry["address"], amount, entry.get("expiration_date") + ) + else: + # Use the entry's message_timestamp as the validity cutoff so + # backdated test scenarios drain buckets that were valid at the + # historical expense time (matching the writer's eager-write path). + _consume_address_credits( + session, + entry["address"], + -amount, + now=entry.get("message_timestamp"), + ) session.flush() @@ -1649,12 +1623,18 @@ def test_credit_balance_details_mixed_expiration(session_factory: DbSessionFacto def test_credit_balance_details_partial_consumption(session_factory: DbSessionFactory): - """Details are accurate after FIFO partial consumption.""" + """Details are accurate after partial consumption under sort-by-expiration. + + Setup: 1000 non-expiring + 500 expiring at exp1, expense 700. + Sort-by-expiration drains the expiring (soonest) bucket first: 500 from + exp1, then 200 from non-expiring. Only the non-expiring bucket has a + positive amount left, so ``details`` is a single entry. + """ ts = dt.datetime(2026, 3, 1, tzinfo=dt.timezone.utc) exp1 = dt.datetime(2026, 9, 1, tzinfo=dt.timezone.utc) entries = [ - # Non-expiring credit (oldest, consumed first by FIFO) + # Non-expiring credit { "address": "0xdetails3", "amount": 1000, @@ -1664,7 +1644,7 @@ def test_credit_balance_details_partial_consumption(session_factory: DbSessionFa "last_update": ts, "expiration_date": None, }, - # Expiring credit + # Expiring credit (will be drained first under sort-by-expiration) { "address": "0xdetails3", "amount": 500, @@ -1674,7 +1654,7 @@ def test_credit_balance_details_partial_consumption(session_factory: DbSessionFa "last_update": ts, "expiration_date": exp1, }, - # Expense consuming 700 from the oldest (non-expiring) credit + # Expense consuming 700 (drains 500 from exp1 then 200 from non-expiring) { "address": "0xdetails3", "amount": -700, @@ -1694,13 +1674,12 @@ def test_credit_balance_details_partial_consumption(session_factory: DbSessionFa total, details = get_credit_balance_with_details( session=session, address="0xdetails3" ) - # 1000 - 700 = 300 non-expiring remaining, 500 expiring remaining + # Soonest-first drain: exp1 -> 0, non-expiring -> 800. Only the + # non-expiring bucket has a positive remainder. assert total == 800 - assert len(details) == 2 + assert len(details) == 1 assert details[0].expiration_date is None - assert details[0].amount == 300 - assert details[1].expiration_date == exp1 - assert details[1].amount == 500 + assert details[0].amount == 800 def test_credit_balance_details_fully_consumed(session_factory: DbSessionFactory): @@ -1807,7 +1786,10 @@ def test_credit_balance_details_no_history(session_factory: DbSessionFactory): def test_credit_balance_details_matches_total(session_factory: DbSessionFactory): - """Sum of details amounts equals the total balance.""" + """Sum of details amounts equals the total balance, with the + sort-by-expiration policy producing a different per-bucket breakdown + than the legacy sort-by-issuance one (same total). + """ ts = dt.datetime(2026, 3, 1, tzinfo=dt.timezone.utc) exp1 = dt.datetime(2026, 9, 1, tzinfo=dt.timezone.utc) exp2 = dt.datetime(2026, 12, 1, tzinfo=dt.timezone.utc) @@ -1840,7 +1822,8 @@ def test_credit_balance_details_matches_total(session_factory: DbSessionFactory) "last_update": ts, "expiration_date": exp2, }, - # Expense consuming 1200 total (FIFO: 1000 from non-expiring, 200 from exp1) + # Expense consuming 1200. Sort-by-expiration drains exp1 (800) then + # exp2 (400); non-expiring (sentinel) is untouched. { "address": "0xdetails6", "amount": -1200, @@ -1860,16 +1843,18 @@ def test_credit_balance_details_matches_total(session_factory: DbSessionFactory) total, details = get_credit_balance_with_details( session=session, address="0xdetails6" ) - # 1000 - 1000 = 0 non-expiring, 800 - 200 = 600 exp1, 600 exp2 + # 1000 non-expiring + 0 exp1 + 200 exp2 = 1200 assert total == 1200 details_sum = sum(d.amount for d in details) assert details_sum == total + # Details order: non-expiring first (None), then by expiration ASC. + # exp1 is drained to 0 so it's absent from the positive-amount list. assert len(details) == 2 - assert details[0].expiration_date == exp1 - assert details[0].amount == 600 + assert details[0].expiration_date is None + assert details[0].amount == 1000 assert details[1].expiration_date == exp2 - assert details[1].amount == 600 + assert details[1].amount == 200 # ── Volume (origin_ref) consumed credits tests ─────────────────────── @@ -2488,3 +2473,461 @@ def test_cursor_pagination_nullable_sort_nulls_on_last_page( ) assert len(all_refs) == 5 assert len(set(all_refs)) == 5 + + +# --------------------------------------------------------------------------- +# Bucket-cache eager-update behaviour +# --------------------------------------------------------------------------- + +from aleph.db.accessors.balances import ( # noqa: E402 + _apply_grant_bucket, + _compute_transfer_entries_by_expiration, + _consume_address_credits, + count_credit_balances, + get_credit_balances, +) +from aleph.toolkit.infinity import INFINITY # noqa: E402 + + +def _bucket_amount(session, address, expiration_date) -> int: + row = session.execute( + select(AlephCreditBalanceDb).where( + AlephCreditBalanceDb.address == address, + AlephCreditBalanceDb.expiration_date == expiration_date, + ) + ).scalar_one_or_none() + return 0 if row is None else int(row.amount) + + +def test_apply_grant_bucket_inserts_and_increments( + session_factory: DbSessionFactory, +): + """Grants insert a new bucket and increment an existing one (same expiration).""" + exp = dt.datetime(2050, 1, 1, tzinfo=dt.timezone.utc) + with session_factory() as session: + _apply_grant_bucket(session, "0xbucket1", 100, exp) + session.commit() + assert _bucket_amount(session, "0xbucket1", exp) == 100 + + _apply_grant_bucket(session, "0xbucket1", 50, exp) + session.commit() + assert _bucket_amount(session, "0xbucket1", exp) == 150 + + +def test_apply_grant_bucket_none_expiration_uses_sentinel( + session_factory: DbSessionFactory, +): + """A None expiration maps to the INFINITY sentinel bucket.""" + with session_factory() as session: + _apply_grant_bucket(session, "0xnoexp", 200, None) + session.commit() + assert _bucket_amount(session, "0xnoexp", INFINITY) == 200 + + +def test_consume_address_credits_drains_soonest_first( + session_factory: DbSessionFactory, +): + """Consumption drains the soonest-expiring non-expired bucket first.""" + exp_soon = dt.datetime(2050, 1, 1, tzinfo=dt.timezone.utc) + exp_late = dt.datetime(2099, 1, 1, tzinfo=dt.timezone.utc) + with session_factory() as session: + _apply_grant_bucket(session, "0xsoonest", 100, exp_late) + _apply_grant_bucket(session, "0xsoonest", 100, exp_soon) + session.commit() + + consumed = _consume_address_credits(session, "0xsoonest", 150) + session.commit() + + # 100 from soon, 50 from late. + assert consumed == [(100, exp_soon), (50, exp_late)] + assert _bucket_amount(session, "0xsoonest", exp_soon) == 0 + assert _bucket_amount(session, "0xsoonest", exp_late) == 50 + + +def test_consume_address_credits_skips_expired_buckets( + session_factory: DbSessionFactory, +): + """A bucket whose expiration has already passed is not drained.""" + past = dt.datetime(2000, 1, 1, tzinfo=dt.timezone.utc) + future = dt.datetime(2099, 1, 1, tzinfo=dt.timezone.utc) + with session_factory() as session: + _apply_grant_bucket(session, "0xskipexpired", 100, past) + _apply_grant_bucket(session, "0xskipexpired", 100, future) + session.commit() + + consumed = _consume_address_credits(session, "0xskipexpired", 100) + session.commit() + + assert consumed == [(100, future)] + assert _bucket_amount(session, "0xskipexpired", past) == 100 + assert _bucket_amount(session, "0xskipexpired", future) == 0 + + +def test_consume_address_credits_underfunded_returns_partial( + session_factory: DbSessionFactory, +): + """Consuming more than available drains everything available and stops at zero.""" + exp = dt.datetime(2099, 1, 1, tzinfo=dt.timezone.utc) + with session_factory() as session: + _apply_grant_bucket(session, "0xshort", 75, exp) + session.commit() + + consumed = _consume_address_credits(session, "0xshort", 200) + session.commit() + + assert consumed == [(75, exp)] + assert _bucket_amount(session, "0xshort", exp) == 0 + + +def test_get_credit_balance_sums_non_expired_buckets( + session_factory: DbSessionFactory, +): + past = dt.datetime(2000, 1, 1, tzinfo=dt.timezone.utc) + future = dt.datetime(2099, 1, 1, tzinfo=dt.timezone.utc) + with session_factory() as session: + _apply_grant_bucket(session, "0xsum", 100, past) + _apply_grant_bucket(session, "0xsum", 250, future) + _apply_grant_bucket(session, "0xsum", 50, None) + session.commit() + assert get_credit_balance(session, "0xsum") == 300 + + +def test_get_credit_balance_floors_at_zero(session_factory: DbSessionFactory): + """Negative bucket amounts (defensive) are floored at zero on read.""" + with session_factory() as session: + session.add( + AlephCreditBalanceDb( + address="0xneg", + expiration_date=INFINITY, + amount=-10, + ) + ) + session.commit() + assert get_credit_balance(session, "0xneg") == 0 + + +def test_get_credit_balance_with_details_groups_by_expiration( + session_factory: DbSessionFactory, +): + """``get_credit_balance_with_details`` returns one entry per bucket with + the no-expiration sentinel mapped back to None and sorted first. + """ + exp_a = dt.datetime(2050, 1, 1, tzinfo=dt.timezone.utc) + exp_b = dt.datetime(2099, 1, 1, tzinfo=dt.timezone.utc) + with session_factory() as session: + _apply_grant_bucket(session, "0xdetails", 100, exp_b) + _apply_grant_bucket(session, "0xdetails", 50, None) + _apply_grant_bucket(session, "0xdetails", 25, exp_a) + session.commit() + + total, details = get_credit_balance_with_details(session, "0xdetails") + assert total == 175 + # No-expiration first, then earliest expiration next. + assert [d.expiration_date for d in details] == [None, exp_a, exp_b] + assert [d.amount for d in details] == [50, 25, 100] + + +def test_compute_transfer_entries_caps_at_source_and_requested(): + """``min(source, requested)`` is applied per consumed slice; same effective + expiration entries are merged.""" + source_exp = dt.datetime(2099, 1, 1, tzinfo=dt.timezone.utc) + later_requested = dt.datetime(2200, 1, 1, tzinfo=dt.timezone.utc) + earlier_requested = dt.datetime(2050, 1, 1, tzinfo=dt.timezone.utc) + + # Requested later than source: source wins. + assert _compute_transfer_entries_by_expiration( + [(100, source_exp)], later_requested + ) == [(100, source_exp)] + + # Requested earlier than source: requested wins. + assert _compute_transfer_entries_by_expiration( + [(100, source_exp)], earlier_requested + ) == [(100, earlier_requested)] + + # Source is the sentinel (no expiration): requested propagates. + assert _compute_transfer_entries_by_expiration( + [(100, INFINITY)], later_requested + ) == [(100, later_requested)] + + # Source is the sentinel and no requested: recipient gets no expiration. + assert _compute_transfer_entries_by_expiration([(100, INFINITY)], None) == [ + (100, None) + ] + + # Two consumed slices with the same effective expiration merge. + assert _compute_transfer_entries_by_expiration( + [(50, source_exp), (50, INFINITY)], earlier_requested + ) == [(100, earlier_requested)] + + +def test_distribution_creates_buckets_with_correct_expiration( + session_factory: DbSessionFactory, +): + """Distribution rows produce buckets at exactly the message expiration_date.""" + ts = dt.datetime(2023, 1, 1, 12, 0, 0, tzinfo=dt.timezone.utc) + exp_dt = dt.datetime(2200, 1, 1, tzinfo=dt.timezone.utc) + exp_ms = int(exp_dt.timestamp() * 1000) + + with session_factory() as session: + update_credit_balances_distribution( + session=session, + credits_list=[ + { + "address": "0xnewdist", + "amount": 42, + "price": "0.1", + "tx_hash": "0xt", + "provider": "p", + "expiration": exp_ms, + } + ], + token="ALEPH", + chain="ETH", + message_hash="msg_new_dist", + message_timestamp=ts, + ) + session.commit() + + # Bucket exists at exactly the expected expiration (not the sentinel). + assert _bucket_amount(session, "0xnewdist", exp_dt) == 42 * 10000 + assert _bucket_amount(session, "0xnewdist", INFINITY) == 0 + + +def test_expense_drains_soonest_bucket_first(session_factory: DbSessionFactory): + """An expense first drains the soonest-expiring bucket. The history row + still records the full intended amount even if buckets are under-drained. + """ + dist1 = dt.datetime(2023, 1, 1, 12, 0, 0, tzinfo=dt.timezone.utc) + dist2 = dt.datetime(2023, 1, 2, 12, 0, 0, tzinfo=dt.timezone.utc) + exp_dt = dt.datetime(2200, 1, 1, tzinfo=dt.timezone.utc) + exp_ms = int(exp_dt.timestamp() * 1000) + expense_ts = dt.datetime(2023, 1, 3, 12, 0, 0, tzinfo=dt.timezone.utc) + + with session_factory() as session: + # Bucket A: 100 expiring at exp_dt (issued FIRST). + update_credit_balances_distribution( + session=session, + credits_list=[ + { + "address": "0xexpense", + "amount": 100, + "price": "0.1", + "tx_hash": "0xa", + "provider": "p", + "expiration": exp_ms, + } + ], + token="ALEPH", + chain="ETH", + message_hash="dist_a", + message_timestamp=dist1, + ) + # Bucket B: 200 non-expiring (issued SECOND). + update_credit_balances_distribution( + session=session, + credits_list=[ + { + "address": "0xexpense", + "amount": 200, + "price": "0.1", + "tx_hash": "0xb", + "provider": "p", + } + ], + token="ALEPH", + chain="ETH", + message_hash="dist_b", + message_timestamp=dist2, + ) + # Expense 150: soonest-first drains A entirely (100) then 50 from B. + update_credit_balances_expense( + session=session, + credits_list=[{"address": "0xexpense", "amount": 150, "ref": "r"}], + message_hash="exp_msg", + message_timestamp=expense_ts, + ) + session.commit() + + assert _bucket_amount(session, "0xexpense", exp_dt) == 0 + assert _bucket_amount(session, "0xexpense", INFINITY) == 150 * 10000 + # The history row carries the full intent. + record = session.execute( + select(AlephCreditHistoryDb).where( + AlephCreditHistoryDb.credit_ref == "exp_msg" + ) + ).scalar_one() + assert record.amount == -150 * 10000 + + +def test_transfer_recipient_inherits_capped_expiration( + session_factory: DbSessionFactory, +): + """Sender's soonest-expiring bucket is drained first; recipient inherits + min(source_exp, requested_exp). + """ + dist_ts = dt.datetime(2023, 1, 1, 12, 0, 0, tzinfo=dt.timezone.utc) + xfer_ts = dt.datetime(2023, 2, 1, 12, 0, 0, tzinfo=dt.timezone.utc) + soon = dt.datetime(2050, 1, 1, tzinfo=dt.timezone.utc) + soon_ms = int(soon.timestamp() * 1000) + requested = dt.datetime(2200, 1, 1, tzinfo=dt.timezone.utc) + requested_ms = int(requested.timestamp() * 1000) + + with session_factory() as session: + update_credit_balances_distribution( + session=session, + credits_list=[ + { + "address": "0xsender", + "amount": 100, + "price": "0.1", + "tx_hash": "0xt", + "provider": "p", + "expiration": soon_ms, + } + ], + token="ALEPH", + chain="ETH", + message_hash="dist_sender", + message_timestamp=dist_ts, + ) + update_credit_balances_transfer( + session=session, + credits_list=[ + {"address": "0xrecipient", "amount": 50, "expiration": requested_ms} + ], + sender_address="0xsender", + whitelisted_addresses=[], + message_hash="xfer", + message_timestamp=xfer_ts, + ) + session.commit() + + # Sender drained from the soon bucket. + assert _bucket_amount(session, "0xsender", soon) == 50 * 10000 + # Recipient bucket capped at source's earlier expiration. + assert _bucket_amount(session, "0xrecipient", soon) == 50 * 10000 + assert _bucket_amount(session, "0xrecipient", requested) == 0 + + +def test_count_and_list_credit_balances(session_factory: DbSessionFactory): + """Multi-address aggregation: ``get_credit_balances`` and + ``count_credit_balances`` aggregate per-address sums of non-expired buckets. + """ + past = dt.datetime(2000, 1, 1, tzinfo=dt.timezone.utc) + future = dt.datetime(2099, 1, 1, tzinfo=dt.timezone.utc) + with session_factory() as session: + _apply_grant_bucket(session, "0xa", 100, future) + _apply_grant_bucket(session, "0xb", 50, future) + _apply_grant_bucket(session, "0xc", 999, past) # expired + _apply_grant_bucket(session, "0xd", 0, future) # zero, no positive sum + session.commit() + + rows = get_credit_balances(session) + addresses = {address for address, _ in rows} + # 0xc has only expired credits, 0xd has zero — both excluded. + assert addresses == {"0xa", "0xb"} + assert dict(rows) == {"0xa": 100, "0xb": 50} + + assert count_credit_balances(session) == 2 + assert count_credit_balances(session, min_balance=75) == 1 + + +def test_repair_rebuilds_buckets_from_history(session_factory: DbSessionFactory): + """The repair function reproduces buckets from credit_history alone. + Simulates a fresh table by deleting all buckets then running repair. + """ + from aleph.repair import _repair_credit_balances + + dist1 = dt.datetime(2023, 1, 1, 12, 0, 0, tzinfo=dt.timezone.utc) + dist2 = dt.datetime(2023, 1, 2, 12, 0, 0, tzinfo=dt.timezone.utc) + expense_ts = dt.datetime(2023, 1, 3, 12, 0, 0, tzinfo=dt.timezone.utc) + exp_dt = dt.datetime(2200, 1, 1, tzinfo=dt.timezone.utc) + exp_ms = int(exp_dt.timestamp() * 1000) + + with session_factory() as session: + update_credit_balances_distribution( + session=session, + credits_list=[ + { + "address": "0xrepair", + "amount": 100, + "price": "0.1", + "tx_hash": "0xt", + "provider": "p", + "expiration": exp_ms, + } + ], + token="ALEPH", + chain="ETH", + message_hash="repair_dist_a", + message_timestamp=dist1, + ) + update_credit_balances_distribution( + session=session, + credits_list=[ + { + "address": "0xrepair", + "amount": 200, + "price": "0.1", + "tx_hash": "0xt", + "provider": "p", + } + ], + token="ALEPH", + chain="ETH", + message_hash="repair_dist_b", + message_timestamp=dist2, + ) + update_credit_balances_expense( + session=session, + credits_list=[{"address": "0xrepair", "amount": 150, "ref": "r"}], + message_hash="repair_exp", + message_timestamp=expense_ts, + ) + session.commit() + + # Capture the expected post-eager state. + expected_bucket_amount = _bucket_amount(session, "0xrepair", INFINITY) + assert expected_bucket_amount == 150 * 10000 + + # Wipe the cache, simulating a freshly migrated DB. + session.execute(delete(AlephCreditBalanceDb)) + session.commit() + + _repair_credit_balances(session_factory) + + with session_factory() as session: + assert _bucket_amount(session, "0xrepair", INFINITY) == expected_bucket_amount + # The expiring bucket was drained first and has no remaining amount, + # so the repair does not write a row for it. + assert _bucket_amount(session, "0xrepair", exp_dt) == 0 + + +def test_repair_is_idempotent(session_factory: DbSessionFactory): + from aleph.repair import _repair_credit_balances + + ts = dt.datetime(2023, 1, 1, 12, 0, 0, tzinfo=dt.timezone.utc) + with session_factory() as session: + update_credit_balances_distribution( + session=session, + credits_list=[ + { + "address": "0xidemp", + "amount": 42, + "price": "0.1", + "tx_hash": "0xt", + "provider": "p", + } + ], + token="ALEPH", + chain="ETH", + message_hash="idemp_dist", + message_timestamp=ts, + ) + session.commit() + + _repair_credit_balances(session_factory) + _repair_credit_balances(session_factory) + + with session_factory() as session: + assert _bucket_amount(session, "0xidemp", INFINITY) == 42 * 10000 diff --git a/tests/db/test_credit_expiration_task.py b/tests/db/test_credit_expiration_task.py new file mode 100644 index 000000000..f9db543a9 --- /dev/null +++ b/tests/db/test_credit_expiration_task.py @@ -0,0 +1,150 @@ +"""Tests for the background credit-expiration task.""" + +import datetime as dt + +import pytest +from sqlalchemy import select + +from aleph.db.accessors.balances import _apply_grant_bucket +from aleph.db.models import AlephCreditBalanceDb +from aleph.services.credit_expiration import NOTIFY_CHANNEL, CreditExpirationTask +from aleph.toolkit.infinity import INFINITY +from aleph.types.db_session import DbSessionFactory + + +def _make_task(session_factory, mock_config): + """The task only needs the config to open its LISTEN connection. The + sync tests below exercise the helpers without opening that connection. + """ + return CreditExpirationTask(session_factory=session_factory, config=mock_config) + + +@pytest.mark.asyncio +async def test_expiration_task_deletes_expired_buckets( + session_factory: DbSessionFactory, mock_config +): + """A sweep removes buckets whose ``expiration_date`` is in the past, and + leaves both still-valid and sentinel buckets untouched. + """ + past = dt.datetime(2000, 1, 1, tzinfo=dt.timezone.utc) + future = dt.datetime(2099, 1, 1, tzinfo=dt.timezone.utc) + with session_factory() as session: + _apply_grant_bucket(session, "0xtask", 100, past) + _apply_grant_bucket(session, "0xtask", 200, future) + _apply_grant_bucket(session, "0xtask", 50, None) + session.commit() + + _make_task(session_factory, mock_config)._delete_expired() + + with session_factory() as session: + exps = sorted( + row.expiration_date + for row in session.execute( + select(AlephCreditBalanceDb).where( + AlephCreditBalanceDb.address == "0xtask" + ) + ).scalars() + ) + assert past not in exps + assert future in exps + assert INFINITY in exps + + +@pytest.mark.asyncio +async def test_expiration_task_peek_ignores_sentinel( + session_factory: DbSessionFactory, mock_config +): + """The ``MIN(expiration_date)`` peek skips the sentinel so the task + never schedules a wakeup for "never expires". + """ + future = dt.datetime(2099, 1, 1, tzinfo=dt.timezone.utc) + with session_factory() as session: + _apply_grant_bucket(session, "0xpeek_sentinel", 100, None) + session.commit() + + assert _make_task(session_factory, mock_config)._peek_next_expiration() is None + + with session_factory() as session: + _apply_grant_bucket(session, "0xpeek_finite", 100, future) + session.commit() + + assert _make_task(session_factory, mock_config)._peek_next_expiration() == future + + +@pytest.mark.asyncio +async def test_listen_callback_sets_event_on_notification( + session_factory: DbSessionFactory, mock_config +): + """The selector-side callback drains pending notifications and sets the + task's event. The interaction between a Postgres NOTIFY and the + ``add_reader`` plumbing is end-to-end exercised by integration tests + rather than unit-tested here; this asserts the in-process glue. + """ + + class _FakeConn: + def __init__(self) -> None: + self.notifies = [object()] + + def poll(self) -> None: + pass + + task = _make_task(session_factory, mock_config) + task._listen_conn = _FakeConn() + assert not task._event.is_set() + task._on_listen_readable() + assert task._event.is_set() + # Subsequent poll with no notifications must not flap the event off. + task._listen_conn.notifies = [] + task._event.clear() + task._on_listen_readable() + assert not task._event.is_set() + + +@pytest.mark.asyncio +async def test_writer_emits_notify_on_finite_expiration( + session_factory: DbSessionFactory, mock_config +): + """``_apply_grant_bucket`` issues ``NOTIFY credit_expiration_changed`` in + the same transaction as the bucket insert, so commit delivers the wake. + Sentinel buckets must not produce a NOTIFY. + + Captured via a LISTEN connection opened from the test config; this is + the integration path the expiration task relies on. + """ + import psycopg2.extensions + + from aleph.db.connection import make_db_url + + future = dt.datetime(2099, 1, 1, tzinfo=dt.timezone.utc) + + # Build the snoop DSN from the same config the session_factory uses. + sqla_url = make_db_url(driver="psycopg2", config=mock_config) + # Strip the SQLAlchemy dialect prefix so psycopg2 accepts the URL. + prefix = "postgresql+psycopg2://" + if sqla_url.startswith(prefix): + snoop_dsn = "postgresql://" + sqla_url[len(prefix) :] + else: + snoop_dsn = sqla_url + + snoop = psycopg2.connect(snoop_dsn) + snoop.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) + snoop_cur = snoop.cursor() + snoop_cur.execute(f"LISTEN {NOTIFY_CHANNEL};") + try: + # Finite-expiration grant: should emit NOTIFY on commit. + with session_factory() as session: + _apply_grant_bucket(session, "0xnotify_finite", 100, future) + session.commit() + snoop.poll() + assert any(n.channel == NOTIFY_CHANNEL for n in snoop.notifies) + snoop.notifies.clear() + + # Sentinel grant: must NOT emit NOTIFY. + with session_factory() as session: + _apply_grant_bucket(session, "0xnotify_sentinel", 100, None) + session.commit() + snoop.poll() + assert not snoop.notifies + finally: + snoop_cur.close() + snoop.close() diff --git a/tests/message_processing/test_process_stores.py b/tests/message_processing/test_process_stores.py index cbe692c67..09633d78a 100644 --- a/tests/message_processing/test_process_stores.py +++ b/tests/message_processing/test_process_stores.py @@ -19,6 +19,7 @@ from aleph.db.accessors.messages import get_message_by_item_hash from aleph.db.models import ( AlephBalanceDb, + AlephCreditBalanceDb, AlephCreditHistoryDb, MessageDb, MessageStatusDb, @@ -905,16 +906,27 @@ async def test_new_store_message_with_sufficient_credits( ) message.parsed_content = content - # Add sufficient credit balance via credit history (required by get_credit_balance) + # Seed the eager bucket cache so get_credit_balance sees the credits. + # (The credit_history row mirrors what the writer would have stored; + # production goes through update_credit_balances_distribution.) + from aleph.toolkit.infinity import INFINITY + session.add( AlephCreditHistoryDb( address=address, - amount=1000000000, # Large enough to cover 1 day of storage + amount=1000000000, credit_ref="test-credit-ref", credit_index=0, message_timestamp=timestamp_to_datetime(CREDIT_ONLY_CUTOFF_TIMESTAMP), ) ) + session.add( + AlephCreditBalanceDb( + address=address, + expiration_date=INFINITY, + amount=1000000000, + ) + ) session.commit() # Should pass the balance check @@ -1298,11 +1310,13 @@ async def test_legacy_store_with_credit_payment_and_credits( ) message.parsed_content = content - # Add sufficient credit balance via credit history (required by get_credit_balance) + # Seed the eager bucket cache so get_credit_balance sees the credits. + from aleph.toolkit.infinity import INFINITY + session.add( AlephCreditHistoryDb( address=address, - amount=1000000000, # Large enough to cover 1 day of storage + amount=1000000000, credit_ref="test-credit-ref", credit_index=0, message_timestamp=timestamp_to_datetime( @@ -1310,6 +1324,13 @@ async def test_legacy_store_with_credit_payment_and_credits( ), ) ) + session.add( + AlephCreditBalanceDb( + address=address, + expiration_date=INFINITY, + amount=1000000000, + ) + ) session.commit() # Should pass the balance check From c6ae5d9774638735ce349b5358c8557ef4467b5e Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Tue, 12 May 2026 22:31:09 +0200 Subject: [PATCH 2/2] refactor: drop credit-balance expiration task The bucket-cache read path filters ``expiration_date > now()``, so expired bucket rows are inert at query time and table-level cleanup is not load-bearing for correctness. The dedicated asyncio task plus LISTEN/NOTIFY plumbing existed only to keep the table compact in real time, which at current scale (one row per address per distinct historical expiration) is not worth the operational complexity: - a dedicated psycopg2 connection outside the SQLAlchemy pool, with no liveness supervision or reconnect on driver hiccup; - a Python-clock vs. server-clock split between ``_delete_expired`` (using ``utc_now()``) and the read predicates (using ``func.now()``); - multi-node herd behaviour: every CCN in a cluster wakes on every NOTIFY and re-runs the same DELETE+peek; - a transactional NOTIFY in every bucket writer. If table bloat becomes a concern later, a periodic ``DELETE FROM credit_balances WHERE expiration_date <= now()`` driven by the existing cron job machinery covers the same need with no long-lived state. Removes ``aleph.services.credit_expiration``, its test module, the ``NOTIFY credit_expiration_changed`` emission in ``_apply_grant_bucket``, and the task wiring in ``commands.py``. The bucket schema, INFINITY sentinel, expiration-date index, and migration are unchanged. --- src/aleph/commands.py | 8 -- src/aleph/db/accessors/balances.py | 8 -- src/aleph/services/credit_expiration.py | 163 ------------------------ tests/db/test_credit_expiration_task.py | 150 ---------------------- 4 files changed, 329 deletions(-) delete mode 100644 src/aleph/services/credit_expiration.py delete mode 100644 tests/db/test_credit_expiration_task.py diff --git a/src/aleph/commands.py b/src/aleph/commands.py index 6a31051e7..720378b40 100644 --- a/src/aleph/commands.py +++ b/src/aleph/commands.py @@ -36,7 +36,6 @@ from aleph.repair import repair_node from aleph.services import p2p from aleph.services.cache.node_cache import NodeCache -from aleph.services.credit_expiration import CreditExpirationTask from aleph.services.ipfs import IpfsService from aleph.services.keys import generate_keypair, save_keys from aleph.services.storage.fileystem_engine import FileSystemStorageEngine @@ -225,13 +224,6 @@ async def main(args: List[str]) -> None: tasks.append(cron_job_task(config=config, cron_job=cron_job)) LOGGER.debug("Initialized cron job task") - LOGGER.debug("Initializing credit expiration task") - credit_expiration = CreditExpirationTask( - session_factory=session_factory, config=config - ) - tasks.append(credit_expiration.run()) - LOGGER.debug("Initialized credit expiration task") - LOGGER.debug("Running event loop") await asyncio.gather(*tasks) diff --git a/src/aleph/db/accessors/balances.py b/src/aleph/db/accessors/balances.py index d7af3e231..a07bf9180 100644 --- a/src/aleph/db/accessors/balances.py +++ b/src/aleph/db/accessors/balances.py @@ -276,14 +276,6 @@ def _apply_grant_bucket( ) session.execute(stmt) - # Tell the expiration task that something may need attention. NOTIFY is - # transactional: Postgres queues the payload and only delivers it to - # LISTENers when the surrounding transaction commits, so the wake fires - # iff the bucket actually lands. We only signal for finite expirations; - # the sentinel ``infinity`` never fires. - if bucket_exp != INFINITY: - session.execute(text("NOTIFY credit_expiration_changed")) - def _consume_address_credits( session: DbSession, diff --git a/src/aleph/services/credit_expiration.py b/src/aleph/services/credit_expiration.py deleted file mode 100644 index a3f3ca8ee..000000000 --- a/src/aleph/services/credit_expiration.py +++ /dev/null @@ -1,163 +0,0 @@ -"""Background task that drops expired credit_balances bucket rows. - -Replaces the read-path FIFO walk that previously filtered expired credits -on every balance read. The task sleeps until the next expiration timestamp -present in ``credit_balances`` and is woken via a Postgres ``LISTEN`` on -``credit_expiration_changed``. Writers in any worker process ``NOTIFY`` -that channel inside their transaction; Postgres delivers the notification -after commit so the wake reflects committed state. - -Single instance per CCN process (the main process — message processing -runs in spawned subprocesses, so the in-process ``asyncio.Event`` is *not* -shared across them; the database is the rendezvous point). -""" - -import asyncio -import logging -from typing import Optional - -import psycopg2 -import psycopg2.extensions -from configmanager import Config -from sqlalchemy import delete, func, select - -from aleph.db.connection import make_db_url -from aleph.db.models import AlephCreditBalanceDb -from aleph.toolkit.infinity import INFINITY -from aleph.toolkit.timestamp import utc_now -from aleph.types.db_session import DbSessionFactory - -LOGGER = logging.getLogger(__name__) - -NOTIFY_CHANNEL = "credit_expiration_changed" - -# Fallback timeout when no expiration is pending. Long enough to be cheap, -# short enough that a missed NOTIFY (e.g. transient driver glitch) recovers -# in bounded time. The NOTIFY path is the primary wake mechanism. -_IDLE_TIMEOUT_SECONDS = 60 * 60 # 1 hour - - -class CreditExpirationTask: - """Single long-running task that deletes expired bucket rows. - - Holds a dedicated psycopg2 connection in autocommit mode for the LISTEN - side (separate from the SQLAlchemy pool so the LISTEN doesn't tie up a - regular connection). The connection's fd is registered with the event - loop via ``loop.add_reader``: when Postgres pushes a notification, the - reader callback drains ``connection.notifies`` and sets the local event. - - Each iteration: - 1. ``DELETE FROM credit_balances WHERE expiration_date <= now()``. - 2. ``SELECT MIN(expiration_date)`` excluding the ``infinity`` sentinel. - 3. ``asyncio.wait_for(event.wait(), timeout=delta)`` if there's a - finite expiration, else unbounded ``event.wait()`` until a NOTIFY. - """ - - def __init__(self, session_factory: DbSessionFactory, config: Config) -> None: - self.session_factory = session_factory - self._config = config - self._event = asyncio.Event() - self._listen_conn: Optional[psycopg2.extensions.connection] = None - self._fd: Optional[int] = None - - async def run(self) -> None: - loop = asyncio.get_event_loop() - self._listen_conn = self._open_listen_connection() - self._fd = self._listen_conn.fileno() - loop.add_reader(self._fd, self._on_listen_readable) - LOGGER.info("Credit expiration task started (LISTEN %s)", NOTIFY_CHANNEL) - try: - while True: - await self._tick() - except asyncio.CancelledError: - LOGGER.info("Credit expiration task cancelled") - raise - finally: - if self._fd is not None: - loop.remove_reader(self._fd) - if self._listen_conn is not None: - self._listen_conn.close() - - def _open_listen_connection(self) -> psycopg2.extensions.connection: - dsn = make_db_url( - driver="psycopg2", - config=self._config, - application_name="credit-expiration-listen", - ) - # SQLAlchemy URL is psycopg2-compatible after stripping the dialect prefix. - prefix = "postgresql+psycopg2://" - if dsn.startswith(prefix): - dsn = "postgresql://" + dsn[len(prefix) :] - conn = psycopg2.connect(dsn) - conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) - cur = conn.cursor() - cur.execute(f"LISTEN {NOTIFY_CHANNEL};") - cur.close() - return conn - - def _on_listen_readable(self) -> None: - """Selector callback: drain pending notifications and wake the task.""" - conn = self._listen_conn - if conn is None: - return - try: - conn.poll() - except Exception: - LOGGER.exception("LISTEN connection poll failed") - return - if conn.notifies: - conn.notifies.clear() - self._event.set() - - async def _tick(self) -> None: - self._delete_expired() - - next_exp = self._peek_next_expiration() - if next_exp is None: - # Nothing pending: wait until a writer NOTIFY-s or the idle - # timeout fires (defence-in-depth re-poll). - try: - await asyncio.wait_for( - self._event.wait(), timeout=_IDLE_TIMEOUT_SECONDS - ) - except asyncio.TimeoutError: - pass - finally: - self._event.clear() - return - - timeout = (next_exp - utc_now()).total_seconds() - if timeout <= 0: - # Already past the next expiration: re-sweep immediately. - return - - try: - await asyncio.wait_for(self._event.wait(), timeout=timeout) - except asyncio.TimeoutError: - pass - finally: - self._event.clear() - - def _peek_next_expiration(self): - with self.session_factory() as session: - return session.execute( - select(func.min(AlephCreditBalanceDb.expiration_date)).where( - AlephCreditBalanceDb.expiration_date > utc_now(), - AlephCreditBalanceDb.expiration_date < INFINITY, - AlephCreditBalanceDb.amount > 0, - ) - ).scalar() - - def _delete_expired(self) -> None: - with self.session_factory() as session: - result = session.execute( - delete(AlephCreditBalanceDb).where( - AlephCreditBalanceDb.expiration_date <= utc_now() - ) - ) - session.commit() - # session.execute(delete(...)) returns a CursorResult at runtime; - # the static Result alias hides ``rowcount`` so cast for mypy. - rowcount = getattr(result, "rowcount", 0) - if rowcount: - LOGGER.info("Dropped %d expired credit bucket(s)", rowcount) diff --git a/tests/db/test_credit_expiration_task.py b/tests/db/test_credit_expiration_task.py deleted file mode 100644 index f9db543a9..000000000 --- a/tests/db/test_credit_expiration_task.py +++ /dev/null @@ -1,150 +0,0 @@ -"""Tests for the background credit-expiration task.""" - -import datetime as dt - -import pytest -from sqlalchemy import select - -from aleph.db.accessors.balances import _apply_grant_bucket -from aleph.db.models import AlephCreditBalanceDb -from aleph.services.credit_expiration import NOTIFY_CHANNEL, CreditExpirationTask -from aleph.toolkit.infinity import INFINITY -from aleph.types.db_session import DbSessionFactory - - -def _make_task(session_factory, mock_config): - """The task only needs the config to open its LISTEN connection. The - sync tests below exercise the helpers without opening that connection. - """ - return CreditExpirationTask(session_factory=session_factory, config=mock_config) - - -@pytest.mark.asyncio -async def test_expiration_task_deletes_expired_buckets( - session_factory: DbSessionFactory, mock_config -): - """A sweep removes buckets whose ``expiration_date`` is in the past, and - leaves both still-valid and sentinel buckets untouched. - """ - past = dt.datetime(2000, 1, 1, tzinfo=dt.timezone.utc) - future = dt.datetime(2099, 1, 1, tzinfo=dt.timezone.utc) - with session_factory() as session: - _apply_grant_bucket(session, "0xtask", 100, past) - _apply_grant_bucket(session, "0xtask", 200, future) - _apply_grant_bucket(session, "0xtask", 50, None) - session.commit() - - _make_task(session_factory, mock_config)._delete_expired() - - with session_factory() as session: - exps = sorted( - row.expiration_date - for row in session.execute( - select(AlephCreditBalanceDb).where( - AlephCreditBalanceDb.address == "0xtask" - ) - ).scalars() - ) - assert past not in exps - assert future in exps - assert INFINITY in exps - - -@pytest.mark.asyncio -async def test_expiration_task_peek_ignores_sentinel( - session_factory: DbSessionFactory, mock_config -): - """The ``MIN(expiration_date)`` peek skips the sentinel so the task - never schedules a wakeup for "never expires". - """ - future = dt.datetime(2099, 1, 1, tzinfo=dt.timezone.utc) - with session_factory() as session: - _apply_grant_bucket(session, "0xpeek_sentinel", 100, None) - session.commit() - - assert _make_task(session_factory, mock_config)._peek_next_expiration() is None - - with session_factory() as session: - _apply_grant_bucket(session, "0xpeek_finite", 100, future) - session.commit() - - assert _make_task(session_factory, mock_config)._peek_next_expiration() == future - - -@pytest.mark.asyncio -async def test_listen_callback_sets_event_on_notification( - session_factory: DbSessionFactory, mock_config -): - """The selector-side callback drains pending notifications and sets the - task's event. The interaction between a Postgres NOTIFY and the - ``add_reader`` plumbing is end-to-end exercised by integration tests - rather than unit-tested here; this asserts the in-process glue. - """ - - class _FakeConn: - def __init__(self) -> None: - self.notifies = [object()] - - def poll(self) -> None: - pass - - task = _make_task(session_factory, mock_config) - task._listen_conn = _FakeConn() - assert not task._event.is_set() - task._on_listen_readable() - assert task._event.is_set() - # Subsequent poll with no notifications must not flap the event off. - task._listen_conn.notifies = [] - task._event.clear() - task._on_listen_readable() - assert not task._event.is_set() - - -@pytest.mark.asyncio -async def test_writer_emits_notify_on_finite_expiration( - session_factory: DbSessionFactory, mock_config -): - """``_apply_grant_bucket`` issues ``NOTIFY credit_expiration_changed`` in - the same transaction as the bucket insert, so commit delivers the wake. - Sentinel buckets must not produce a NOTIFY. - - Captured via a LISTEN connection opened from the test config; this is - the integration path the expiration task relies on. - """ - import psycopg2.extensions - - from aleph.db.connection import make_db_url - - future = dt.datetime(2099, 1, 1, tzinfo=dt.timezone.utc) - - # Build the snoop DSN from the same config the session_factory uses. - sqla_url = make_db_url(driver="psycopg2", config=mock_config) - # Strip the SQLAlchemy dialect prefix so psycopg2 accepts the URL. - prefix = "postgresql+psycopg2://" - if sqla_url.startswith(prefix): - snoop_dsn = "postgresql://" + sqla_url[len(prefix) :] - else: - snoop_dsn = sqla_url - - snoop = psycopg2.connect(snoop_dsn) - snoop.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) - snoop_cur = snoop.cursor() - snoop_cur.execute(f"LISTEN {NOTIFY_CHANNEL};") - try: - # Finite-expiration grant: should emit NOTIFY on commit. - with session_factory() as session: - _apply_grant_bucket(session, "0xnotify_finite", 100, future) - session.commit() - snoop.poll() - assert any(n.channel == NOTIFY_CHANNEL for n in snoop.notifies) - snoop.notifies.clear() - - # Sentinel grant: must NOT emit NOTIFY. - with session_factory() as session: - _apply_grant_bucket(session, "0xnotify_sentinel", 100, None) - session.commit() - snoop.poll() - assert not snoop.notifies - finally: - snoop_cur.close() - snoop.close()