diff --git a/deployment/migrations/versions/0057_b1c2d3e4f5a6_replace_credit_balances_with_buckets.py b/deployment/migrations/versions/0057_b1c2d3e4f5a6_replace_credit_balances_with_buckets.py new file mode 100644 index 000000000..1ff5048a3 --- /dev/null +++ b/deployment/migrations/versions/0057_b1c2d3e4f5a6_replace_credit_balances_with_buckets.py @@ -0,0 +1,69 @@ +"""Replace credit_balances scalar cache with per-expiration buckets + +Revision ID: b1c2d3e4f5a6 +Revises: 7e5a630e4b36 +Create Date: 2026-05-12 + +The previous credit_balances table held a single FIFO-derived scalar per +address, lazily recomputed on read by an O(N^2) Python walk over +credit_history. This migration replaces it with a bucket cache keyed by +(address, expiration_date), eagerly maintained by the credit_history +writers under a sort-by-expiration policy. The "no expiration" case is +encoded as PG ``'infinity'::timestamptz`` so expiration_date stays NOT NULL +inside the composite primary key, and read queries can use a uniform +``expiration_date > now()`` predicate without IS-NULL branches. + +The previous table is dropped rather than migrated; credit_history is the +source of truth and aleph.repair.repair_credit_balances repopulates the +new table from history on the next startup. +""" + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "b1c2d3e4f5a6" +down_revision = "7e5a630e4b36" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.drop_table("credit_balances") + op.create_table( + "credit_balances", + sa.Column("address", sa.String(), nullable=False), + sa.Column("expiration_date", sa.TIMESTAMP(timezone=True), nullable=False), + sa.Column("amount", sa.BigInteger(), nullable=False), + sa.Column( + "last_update", + sa.TIMESTAMP(timezone=True), + nullable=False, + server_default=sa.func.now(), + ), + sa.PrimaryKeyConstraint( + "address", "expiration_date", name="credit_balances_pkey" + ), + ) + op.create_index( + "credit_balances_expiration_date_idx", + "credit_balances", + ["expiration_date"], + ) + + +def downgrade() -> None: + op.drop_table("credit_balances") + op.create_table( + "credit_balances", + sa.Column("address", sa.String(), nullable=False), + sa.Column("balance", sa.BigInteger(), nullable=False, server_default="0"), + sa.Column( + "last_update", + sa.TIMESTAMP(timezone=True), + nullable=False, + server_default=sa.func.now(), + ), + sa.PrimaryKeyConstraint("address", name="credit_balances_pkey"), + ) + op.create_index("ix_credit_balances_address", "credit_balances", ["address"]) diff --git a/src/aleph/db/accessors/balances.py b/src/aleph/db/accessors/balances.py index 85538a5c8..a07bf9180 100644 --- a/src/aleph/db/accessors/balances.py +++ b/src/aleph/db/accessors/balances.py @@ -9,6 +9,7 @@ from aleph_message.models import Chain from sqlalchemy import func, select, text +from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.sql import Select from aleph.db.models import AlephBalanceDb, AlephCreditBalanceDb, AlephCreditHistoryDb @@ -16,6 +17,7 @@ CREDIT_PRECISION_CUTOFF_TIMESTAMP, CREDIT_PRECISION_MULTIPLIER, ) +from aleph.toolkit.infinity import INFINITY from aleph.toolkit.timestamp import timestamp_to_datetime, utc_now from aleph.types.db_session import DbSession from aleph.types.sort_order import SortByCreditHistory, SortOrder @@ -228,268 +230,206 @@ def get_updated_balance_accounts(session: DbSession, last_update: dt.datetime): @dataclass -class PositiveCredit: - amount: int +class CreditBalanceDetail: expiration_date: Optional[dt.datetime] - timestamp: dt.datetime - remaining: int + amount: int -@dataclass -class NegativeAmount: - amount: int - timestamp: dt.datetime +def _bucket_expiration(expiration_date: Optional[dt.datetime]) -> dt.datetime: + """Map an optional API-level expiration to the DB sentinel for the bucket PK.""" + return expiration_date if expiration_date is not None else INFINITY -@dataclass -class CreditBalanceDetail: - expiration_date: Optional[dt.datetime] - amount: int +def _api_expiration(expiration_date: dt.datetime) -> Optional[dt.datetime]: + """Inverse of _bucket_expiration: convert the sentinel back to None for callers.""" + return None if expiration_date == INFINITY else expiration_date -def _apply_fifo_consumption( - session: DbSession, address: str, now: Optional[dt.datetime] = None -) -> list[PositiveCredit]: +def _apply_grant_bucket( + session: DbSession, + address: str, + amount: int, + expiration_date: Optional[dt.datetime], +) -> None: + """Add ``amount`` (positive) to the bucket for ``(address, expiration_date)``. + + Inserts the bucket if missing, increments it otherwise. ``amount`` may be + negative to undo a previous grant, but consumption of expenses/transfers + should go through ``_consume_address_credits`` so the soonest-expiring + bucket is drained first. """ - Fetch all positive credits for the address and apply all existing debits via FIFO. + bucket_exp = _bucket_expiration(expiration_date) + stmt = pg_insert(AlephCreditBalanceDb).values( + address=address, + expiration_date=bucket_exp, + amount=amount, + ) + stmt = stmt.on_conflict_do_update( + index_elements=[ + AlephCreditBalanceDb.address, + AlephCreditBalanceDb.expiration_date, + ], + set_={ + "amount": AlephCreditBalanceDb.amount + stmt.excluded.amount, + "last_update": func.now(), + }, + ) + session.execute(stmt) + - Returns the list of PositiveCredit objects with `.remaining` reflecting how many - credits are still unconsumed after all past expenses/transfers. Expired credits are - included in the returned list — callers are responsible for filtering by expiration. +def _consume_address_credits( + session: DbSession, + address: str, + amount: int, + now: Optional[dt.datetime] = None, +) -> List[Tuple[int, dt.datetime]]: + """Drain ``amount`` credits from the address's still-valid buckets, soonest + expiry first. Returns a list of ``(consumed_amount, source_bucket_expiration)`` + in consumption order. The bucket expiration is the DB-stored value (with + sentinel for "no expiration"); callers wanting the API-shaped value should + pass it through ``_api_expiration``. + + ``now`` is the validity cutoff: only buckets whose ``expiration_date`` is + strictly after ``now`` are eligible to be drained. Production callers pass + the expense / transfer ``message_timestamp`` here so a bucket that was + valid at the moment the message was signed remains spendable even if it + has since expired in wall-clock terms. This also keeps the eager-write + path consistent with the repair replay, which uses the historical + ``message_timestamp`` as the cutoff. ``now`` defaults to ``func.now()`` + (evaluated server-side) for callers that genuinely mean "right now". + + Bucket rows are locked ``FOR UPDATE`` to serialise concurrent writers for + the same address. Buckets are not deleted when they reach zero; the + expiration task and the next read both treat zero-amount buckets as inert. + + If ``amount`` exceeds the available credit, the leftover is silently + dropped (matches the prior FIFO behaviour). Going negative is prevented. """ - now = now if now is not None else utc_now() + if amount <= 0: + return [] - records = ( + cutoff = now if now is not None else func.now() + buckets = ( session.execute( - select(AlephCreditHistoryDb) - .where(AlephCreditHistoryDb.address == address) - .order_by(AlephCreditHistoryDb.message_timestamp.asc()) + select(AlephCreditBalanceDb) + .where( + AlephCreditBalanceDb.address == address, + AlephCreditBalanceDb.expiration_date > cutoff, + AlephCreditBalanceDb.amount > 0, + ) + .order_by(AlephCreditBalanceDb.expiration_date.asc()) + .with_for_update() ) .scalars() .all() ) - positive_credits: list[PositiveCredit] = [] - negative_amounts: list[NegativeAmount] = [] - - for record in records: - if record.amount > 0: - positive_credits.append( - PositiveCredit( - amount=record.amount, - expiration_date=record.expiration_date, - timestamp=record.message_timestamp, - remaining=record.amount, - ) - ) - else: - negative_amounts.append( - NegativeAmount( - amount=abs(record.amount), timestamp=record.message_timestamp - ) - ) - - for expense in negative_amounts: - remaining_expense = expense.amount - for credit in positive_credits: - if remaining_expense <= 0: - break - expense_valid = ( - credit.expiration_date is None - or expense.timestamp < credit.expiration_date - ) - if expense_valid and credit.remaining > 0: - consumed = min(credit.remaining, remaining_expense) - credit.remaining -= consumed - remaining_expense -= consumed - - return positive_credits + consumed_log: List[Tuple[int, dt.datetime]] = [] + remaining = amount + for bucket in buckets: + if remaining <= 0: + break + take = min(bucket.amount, remaining) + bucket.amount -= take + remaining -= take + consumed_log.append((take, bucket.expiration_date)) + session.flush() + return consumed_log def _compute_transfer_entries_by_expiration( - remaining_credits: list[PositiveCredit], - amount: int, + consumed_buckets: List[Tuple[int, dt.datetime]], requested_expiration: Optional[dt.datetime], - now: dt.datetime, -) -> list[tuple[int, Optional[dt.datetime]]]: +) -> List[Tuple[int, Optional[dt.datetime]]]: + """For each consumed source bucket, produce the matching recipient entry, + capping the recipient's expiration at the more-restrictive of source vs. + requested. Adjacent entries with the same effective expiration are merged + so a recipient never sees more granularity than necessary. + + The cap rule (``min(source, requested)``) prevents an attacker re-transferring + expiring credits with a longer requested expiration to bypass the original + expiry. Whitelisted senders skip this path entirely (see caller). """ - Simulate consuming `amount` from `remaining_credits` (FIFO order) and return a list - of (portion_amount, effective_expiration) pairs. - - Credits are consumed in the same FIFO order used by the balance calculation, so the - expiration assignment for the recipient is consistent with the sender's accounting. - - The effective expiration for each portion is: - min(source_credit.expiration_date, requested_expiration) - where None means no expiration. - - Adjacent portions with the same effective expiration are merged into one entry. - This prevents a re-transfer from extending or removing the original expiration - constraint placed on the source credits. - """ - result: list[tuple[int, Optional[dt.datetime]]] = [] - remaining_to_consume = amount - - for credit in remaining_credits: - if remaining_to_consume <= 0: - break - if credit.expiration_date is not None and credit.expiration_date <= now: - continue - if credit.remaining <= 0: - continue - - consumed = min(credit.remaining, remaining_to_consume) - remaining_to_consume -= consumed - - # Effective expiration: most restrictive of source and requested - if credit.expiration_date is not None: - effective_exp: Optional[dt.datetime] = ( - credit.expiration_date - if requested_expiration is None - else min(credit.expiration_date, requested_expiration) - ) + result: List[Tuple[int, Optional[dt.datetime]]] = [] + for consumed, source_exp in consumed_buckets: + api_source_exp = _api_expiration(source_exp) + + if api_source_exp is None: + effective_exp: Optional[dt.datetime] = requested_expiration + elif requested_expiration is None: + effective_exp = api_source_exp else: - effective_exp = requested_expiration + effective_exp = min(api_source_exp, requested_expiration) - # Merge with previous entry if same effective expiration if result and result[-1][1] == effective_exp: result[-1] = (result[-1][0] + consumed, effective_exp) else: result.append((consumed, effective_exp)) - return result -def _calculate_credit_balance_fifo( - session: DbSession, address: str, now: Optional[dt.datetime] = None -) -> int: - """ - Calculate credit balance using FIFO consumption strategy. - - This function implements the core FIFO logic: - 1. Get all positive credits (ordered by message_timestamp) - 2. Get all negative amounts (expenses/transfers) - 3. Apply negative amounts to oldest credits first, but only if the expense - occurred before the credit's expiration date - 4. Return remaining balance considering current expiration status - """ - now = now if now is not None else utc_now() - positive_credits = _apply_fifo_consumption(session, address, now) - total_balance = sum( - c.remaining - for c in positive_credits - if c.expiration_date is None or c.expiration_date > now - ) - return max(0, total_balance) - - -def get_credit_balance_with_details( - session: DbSession, address: str, now: Optional[dt.datetime] = None -) -> Tuple[int, List[CreditBalanceDetail]]: - """ - Calculate credit balance with a breakdown by expiration date. - - Returns (total_balance, details) where details is a list of - CreditBalanceDetail grouped by expiration_date, sorted with - non-expiring (None) first, then by expiration_date ascending. +def _credit_balance_amount_expr(): + """Reusable SQL expression: per-address sum of non-expired bucket amounts. - Always recalculates (bypasses cache) since details are not cached. + Uses ``func.now()`` so the cutoff is evaluated server-side at statement + execution time rather than at expression-construction time in Python. """ - now = now if now is not None else utc_now() - positive_credits = _apply_fifo_consumption(session, address, now) - - details_map: Dict[Optional[dt.datetime], int] = {} - total_balance = 0 - for credit in positive_credits: - if credit.expiration_date is None or credit.expiration_date > now: - if credit.remaining > 0: - total_balance += credit.remaining - key = credit.expiration_date - details_map[key] = details_map.get(key, 0) + credit.remaining - - # Sort: non-expiring first (None), then by expiration_date ascending - details = [ - CreditBalanceDetail(expiration_date=k, amount=v) - for k, v in sorted( - details_map.items(), - key=lambda x: (x[0] is not None, x[0] or dt.datetime.min), - ) - ] - - return max(0, total_balance), details + return func.coalesce( + func.sum(AlephCreditBalanceDb.amount).filter( + AlephCreditBalanceDb.expiration_date > func.now() + ), + 0, + ) def get_credit_balance( session: DbSession, address: str, now: Optional[dt.datetime] = None ) -> int: - """ - Get credit balance using lazy recalculation strategy. + """Sum of non-expired bucket amounts for ``address``, floored at zero. - 1. Check if cached balance exists in credit_balances table - 2. Check if credit_history has newer entries than cached balance - 3. Check if any credits have expiration dates that occurred after the cache's last update - 4. If recalculation is needed, recalculate using FIFO and update cache - 5. Return cached balance + Pure read: no FIFO walk, no write-back. The bucket cache is maintained by + the credit_history writers and by the expiration task. """ - - now = now if now is not None else utc_now() - - # Get the timestamp of the most recent credit history entry for this address - latest_history_timestamp = session.execute( - select(func.max(AlephCreditHistoryDb.last_update)).where( - AlephCreditHistoryDb.address == address + cutoff = now if now is not None else func.now() + result = session.execute( + select(func.coalesce(func.sum(AlephCreditBalanceDb.amount), 0)).where( + AlephCreditBalanceDb.address == address, + AlephCreditBalanceDb.expiration_date > cutoff, ) ).scalar() + return max(0, int(result or 0)) - # If no history exists, balance is 0 - if latest_history_timestamp is None: - return 0 - - # Get cached balance if it exists - cached_balance = session.execute( - select(AlephCreditBalanceDb).where(AlephCreditBalanceDb.address == address) - ).scalar_one_or_none() - - # Check if recalculation is needed - needs_recalculation = ( - cached_balance is None or cached_balance.last_update < latest_history_timestamp - ) - - # Also check if any credits have expiration dates that occurred after the cache's last update - # This handles the case where credits expired since the last cache update - if not needs_recalculation and cached_balance is not None: - # Check for any credits with expiration dates between cache last_update and now - earliest_expiration_after_cache = session.execute( - select(func.min(AlephCreditHistoryDb.expiration_date)).where( - (AlephCreditHistoryDb.address == address) - & (AlephCreditHistoryDb.expiration_date.isnot(None)) - & (AlephCreditHistoryDb.expiration_date > cached_balance.last_update) - & (AlephCreditHistoryDb.expiration_date <= now) - ) - ).scalar() - - needs_recalculation = earliest_expiration_after_cache is not None - if needs_recalculation: - # Recalculate balance using FIFO - new_balance = _calculate_credit_balance_fifo(session, address, now) +def get_credit_balance_with_details( + session: DbSession, address: str, now: Optional[dt.datetime] = None +) -> Tuple[int, List[CreditBalanceDetail]]: + """Per-expiration-date breakdown of an address's non-expired credits. - if cached_balance is None: - # Create new cache entry - session.add( - AlephCreditBalanceDb( - address=address, balance=new_balance, last_update=now - ) - ) - else: - # Update existing cache entry - cached_balance.balance = new_balance - cached_balance.last_update = now + Returns ``(total, details)`` where details are sorted with the + non-expiring bucket first (sentinel mapped back to ``None``), then by + expiration ascending. Zero-amount buckets are filtered out. + """ + cutoff = now if now is not None else func.now() + rows = session.execute( + select(AlephCreditBalanceDb.expiration_date, AlephCreditBalanceDb.amount).where( + AlephCreditBalanceDb.address == address, + AlephCreditBalanceDb.expiration_date > cutoff, + AlephCreditBalanceDb.amount > 0, + ) + ).all() - session.flush() - return new_balance + pairs = [(row.expiration_date, int(row.amount)) for row in rows] + total = max(0, sum(amount for _, amount in pairs)) - return cached_balance.balance if cached_balance else 0 + details = [ + CreditBalanceDetail(expiration_date=_api_expiration(exp), amount=amount) + for exp, amount in sorted( + pairs, + # Sentinel (no expiration) first, then by expiration ascending. + key=lambda x: (x[0] != INFINITY, x[0]), + ) + ] + return total, details def get_credit_balances( @@ -500,16 +440,17 @@ def get_credit_balances( after_address: Optional[str] = None, cursor_mode: bool = False, ) -> list[tuple[str, int]]: + """Paginated list of (address, balance) across all addresses with a + positive non-expired sum. """ - Get paginated credit balances for all addresses. - Uses the cached balances from the credit_balances table. - """ - query = select(AlephCreditBalanceDb.address, AlephCreditBalanceDb.balance) + balance_expr = _credit_balance_amount_expr().label("balance") - if min_balance > 0: - query = query.filter(AlephCreditBalanceDb.balance >= min_balance) - - query = query.order_by(AlephCreditBalanceDb.address.asc()) + query = ( + select(AlephCreditBalanceDb.address, balance_expr) + .group_by(AlephCreditBalanceDb.address) + .having(balance_expr >= max(min_balance, 1)) + .order_by(AlephCreditBalanceDb.address.asc()) + ) if after_address is not None: query = query.where(AlephCreditBalanceDb.address > after_address) @@ -522,22 +463,20 @@ def get_credit_balances( if pagination: query = query.limit(pagination) - # Return results in the expected format (address, credits) - results = session.execute(query).all() - return [(row.address, row.balance) for row in results] + return [(row.address, int(row.balance)) for row in session.execute(query).all()] def count_credit_balances(session: DbSession, min_balance: int = 0) -> int: - """ - Count addresses with credit balances. - Uses the cached balances from the credit_balances table. - """ - query = select(func.count(AlephCreditBalanceDb.address)) - - if min_balance > 0: - query = query.filter(AlephCreditBalanceDb.balance >= min_balance) - - return session.execute(query).scalar_one() + """Count of addresses with a positive non-expired sum (or matching + ``min_balance``).""" + balance_expr = _credit_balance_amount_expr().label("balance") + sub = ( + select(AlephCreditBalanceDb.address) + .group_by(AlephCreditBalanceDb.address) + .having(balance_expr >= max(min_balance, 1)) + .subquery() + ) + return session.execute(select(func.count()).select_from(sub)).scalar_one() def _format_csv_row(*fields) -> str: @@ -624,11 +563,8 @@ def update_credit_balances_distribution( message_hash: str, message_timestamp: dt.datetime, ) -> None: - """ - Updates credit balances for distribution messages (aleph_credit_distribution). - - Distribution messages include all fields like price, bonus_amount, tx_hash, provider, - payment_method, token, chain, and expiration_date. + """Apply a distribution message: grant each entry's bucket and append the + matching credit_history rows. """ last_update = utc_now() @@ -642,20 +578,20 @@ def update_credit_balances_distribution( tx_hash = credit_entry["tx_hash"] provider = credit_entry["provider"] - # Extract optional fields from each credit entry expiration_timestamp = credit_entry.get("expiration") or None origin = credit_entry.get("origin", "") origin_ref = credit_entry.get("ref", "") payment_method = credit_entry.get("payment_method", "") bonus_amount = credit_entry.get("bonus_amount", "") - # Convert expiration timestamp to datetime expiration_date = ( dt.datetime.fromtimestamp(expiration_timestamp / 1000, tz=dt.timezone.utc) if expiration_timestamp is not None else None ) + _apply_grant_bucket(session, address, amount, expiration_date) + csv_rows.append( _format_csv_row( address, @@ -686,15 +622,12 @@ def update_credit_balances_expense( message_hash: str, message_timestamp: dt.datetime, ) -> None: - """ - Updates credit balances for expense messages (aleph_credit_expense). - - Expense messages have negative amounts and can include: - - execution_id (mapped to origin) - - node_id (mapped to tx_hash) - - price (mapped to price) - - time (skipped for now) - - ref (mapped to origin_ref) + """Apply an expense message: drain each entry's amount from the soonest- + expiring non-expired buckets, then append the matching credit_history rows. + + The history row's negative ``amount`` reflects the message intent. If the + address is under-funded, fewer credits are actually consumed (buckets cannot + go negative), matching the prior FIFO behaviour. """ last_update = utc_now() @@ -703,19 +636,18 @@ def update_credit_balances_expense( for index, credit_entry in enumerate(credits_list): address = credit_entry["address"] raw_amount = int(credit_entry["amount"]) - amount = -_apply_credit_precision_multiplier(raw_amount, message_timestamp) + amount = _apply_credit_precision_multiplier(raw_amount, message_timestamp) origin_ref = credit_entry.get("ref", "") - - # Map new fields origin = credit_entry.get("execution_id", "") tx_hash = credit_entry.get("node_id", "") price = credit_entry.get("price", "") - # Skip time field for now + + _consume_address_credits(session, address, amount, now=message_timestamp) csv_rows.append( _format_csv_row( address, - amount, + -amount, message_hash, index, message_timestamp, @@ -744,21 +676,13 @@ def update_credit_balances_transfer( message_hash: str, message_timestamp: dt.datetime, ) -> None: - """ - Updates credit balances for transfer messages (aleph_credit_transfer). - - Transfer messages involve two entries per transfer: - - One or more positive entries for the recipient (adding credits) - - One negative entry for the sender (subtracting credits) - - When a non-whitelisted sender re-transfers credits they received with an expiration - date, the recipient's credits are capped at the original expiration — preventing - bypass of expiration constraints through re-transfers. If the sender's credits have - mixed expirations, multiple positive entries are created for the recipient (one per - expiration group). + """Apply a transfer message: drain the sender's buckets (soonest-expiring + first), grant the resulting amounts to the recipient(s) with each portion + capped at ``min(source_expiration, requested_expiration)``, and append the + matching credit_history rows. - Special case: If sender is in the whitelisted addresses, only add credits to recipient - using the requested expiration as-is (whitelisted senders create credits from nothing). + Whitelisted senders create credits from nothing: the sender is not debited + and the recipient is granted ``amount`` with the requested expiration as-is. """ last_update = utc_now() @@ -766,18 +690,12 @@ def update_credit_balances_transfer( index = 0 is_whitelisted = sender_address in whitelisted_addresses - # Compute sender's remaining credits once for all entries in this transfer - sender_remaining: list[PositiveCredit] = [] - if not is_whitelisted: - sender_remaining = _apply_fifo_consumption(session, sender_address, last_update) - for credit_entry in credits_list: recipient_address = credit_entry["address"] raw_amount = int(credit_entry["amount"]) amount = _apply_credit_precision_multiplier(raw_amount, message_timestamp) expiration_timestamp = credit_entry.get("expiration") or None - # Convert expiration timestamp to datetime requested_expiration = ( dt.datetime.fromtimestamp(expiration_timestamp / 1000, tz=dt.timezone.utc) if expiration_timestamp is not None @@ -785,21 +703,29 @@ def update_credit_balances_transfer( ) if is_whitelisted: - # Whitelisted senders are not constrained by source credits - entries: list[tuple[int, Optional[dt.datetime]]] = [ + entries: List[Tuple[int, Optional[dt.datetime]]] = [ (amount, requested_expiration) ] else: + consumed = _consume_address_credits( + session, sender_address, amount, now=message_timestamp + ) entries = _compute_transfer_entries_by_expiration( - sender_remaining, amount, requested_expiration, last_update + consumed, requested_expiration ) - # Fallback for edge cases where sender credits are not tracked - # (e.g. whitelisted distributions not recorded in history) + # Production transfers are gated by validate_credit_transfer_balance, + # so consumed should sum to ``amount``. Fall back to a single + # ``(amount, requested_expiration)`` entry whenever it doesn't (under- + # funded test scenarios or zero-amount transfers) so the recipient + # still receives a history row matching the message intent. The + # bucket grant below is a no-op for zero-amount entries. if not entries: entries = [(amount, requested_expiration)] - # Add positive entries for recipient (one per expiration group) for entry_amount, entry_expiration in entries: + _apply_grant_bucket( + session, recipient_address, entry_amount, entry_expiration + ) csv_rows.append( _format_csv_row( recipient_address, @@ -822,8 +748,6 @@ def update_credit_balances_transfer( ) index += 1 - # Add negative entry for sender (unless sender is in whitelisted addresses) - # (origin = recipient, provider = ALEPH, payment_method = credit_transfer) if not is_whitelisted: csv_rows.append( _format_csv_row( diff --git a/src/aleph/db/connection.py b/src/aleph/db/connection.py index c42921869..47f57e77e 100644 --- a/src/aleph/db/connection.py +++ b/src/aleph/db/connection.py @@ -7,8 +7,14 @@ from sqlalchemy.orm import sessionmaker from aleph.config import get_config +from aleph.toolkit.infinity import register_infinity_adapter from aleph.types.db_session import DbSessionFactory +# Make psycopg2 emit PG ``infinity::timestamptz`` for the credit-balance +# expiration sentinel. The adapter is global to psycopg2, so registering once +# at module import time covers every connection opened later in this process. +register_infinity_adapter() + def make_db_url( driver: str, config: Config, application_name: Optional[str] = None diff --git a/src/aleph/db/models/balances.py b/src/aleph/db/models/balances.py index c4ef27d6b..b7e6b6056 100644 --- a/src/aleph/db/models/balances.py +++ b/src/aleph/db/models/balances.py @@ -70,10 +70,20 @@ class AlephCreditHistoryDb(Base): class AlephCreditBalanceDb(Base): + """Eagerly-maintained credit balance cache, one row per (address, expiration_date). + + For credits without an expiration, ``aleph.toolkit.infinity.INFINITY`` + (PG ``'infinity'::timestamptz``) is used so the primary key columns can + stay NOT NULL while reads still treat the bucket as never expiring. + """ + __tablename__ = "credit_balances" - address: Mapped[str] = mapped_column(String, primary_key=True, index=True) - balance: Mapped[int] = mapped_column(BigInteger, nullable=False, default=0) + address: Mapped[str] = mapped_column(String, primary_key=True) + expiration_date: Mapped[dt.datetime] = mapped_column( + TIMESTAMP(timezone=True), primary_key=True + ) + amount: Mapped[int] = mapped_column(BigInteger, nullable=False, default=0) last_update: Mapped[dt.datetime] = mapped_column( TIMESTAMP(timezone=True), nullable=False, diff --git a/src/aleph/repair.py b/src/aleph/repair.py index 2f939140a..2569f3611 100644 --- a/src/aleph/repair.py +++ b/src/aleph/repair.py @@ -1,11 +1,14 @@ import logging from aleph_message.models import ItemHash -from sqlalchemy import select +from sqlalchemy import delete, select +from sqlalchemy.dialects.postgresql import insert as pg_insert from aleph.db.accessors.files import upsert_file -from aleph.db.models import StoredFileDb +from aleph.db.models import AlephCreditBalanceDb, AlephCreditHistoryDb, StoredFileDb from aleph.storage import StorageService +from aleph.toolkit.infinity import INFINITY +from aleph.toolkit.timestamp import utc_now from aleph.types.db_session import DbSession, DbSessionFactory LOGGER = logging.getLogger(__name__) @@ -46,6 +49,93 @@ async def _fix_file_sizes( ) +def _rebuild_credit_buckets_for_address(session: DbSession, address: str) -> None: + """Replay ``credit_history`` chronologically under the sort-by-expiration + policy and replace this address's buckets with the resulting state. + + Idempotent: clears existing buckets for the address first, then rebuilds. + Safe to interrupt at address granularity (callers commit per-address). + """ + session.execute( + delete(AlephCreditBalanceDb).where(AlephCreditBalanceDb.address == address) + ) + + records = ( + session.execute( + select(AlephCreditHistoryDb) + .where(AlephCreditHistoryDb.address == address) + .order_by( + AlephCreditHistoryDb.message_timestamp.asc(), + AlephCreditHistoryDb.credit_ref.asc(), + AlephCreditHistoryDb.credit_index.asc(), + ) + ) + .scalars() + .all() + ) + + # Bucket state in memory: {expiration_date (sentinel for None): amount}. + buckets: dict = {} + + def _bucket_key(expiration): + return expiration if expiration is not None else INFINITY + + for record in records: + if record.amount > 0: + key = _bucket_key(record.expiration_date) + buckets[key] = buckets.get(key, 0) + record.amount + else: + remaining = -int(record.amount) + # Soonest-expiring first, ignoring buckets that have already expired + # at the moment of the historical expense. + keys_in_order = sorted(buckets.keys()) + for key in keys_in_order: + if remaining <= 0: + break + if key <= record.message_timestamp: + continue + available = buckets[key] + if available <= 0: + continue + take = min(available, remaining) + buckets[key] = available - take + remaining -= take + + now = utc_now() + rows = [ + {"address": address, "expiration_date": key, "amount": amount} + for key, amount in buckets.items() + if amount > 0 and key > now + ] + if rows: + session.execute(pg_insert(AlephCreditBalanceDb).values(rows)) + + +def _repair_credit_balances(session_factory: DbSessionFactory) -> None: + """Bootstrap or repair the credit_balances bucket cache from credit_history. + + Rebuilds buckets for every address that has any credit_history rows. This is + idempotent and runs on every startup; after the initial bootstrap it's a + bounded full-table scan plus per-address rebuild, which is acceptable given + typical address counts. + """ + with session_factory() as session: + addresses = list( + session.execute(select(AlephCreditHistoryDb.address).distinct()).scalars() + ) + + LOGGER.info("Repairing credit_balances for %d address(es)", len(addresses)) + + for i, address in enumerate(addresses): + with session_factory() as session: + _rebuild_credit_buckets_for_address(session, address) + session.commit() + if (i + 1) % 500 == 0: + LOGGER.info("Repaired %d / %d", i + 1, len(addresses)) + + LOGGER.info("Credit balances repair complete (%d address(es))", len(addresses)) + + async def repair_node( storage_service: StorageService, session_factory: DbSessionFactory ): @@ -53,3 +143,6 @@ async def repair_node( with session_factory() as session: await _fix_file_sizes(session, storage_service, store_files=True) session.commit() + + LOGGER.info("Repairing credit balances") + _repair_credit_balances(session_factory) diff --git a/src/aleph/toolkit/infinity.py b/src/aleph/toolkit/infinity.py new file mode 100644 index 000000000..fedef9d54 --- /dev/null +++ b/src/aleph/toolkit/infinity.py @@ -0,0 +1,103 @@ +"""``INFINITY`` sentinel for the credit_balances expiration column. + +The bucket cache models "no expiration" as a real timestamptz with PG's +``infinity`` value, not NULL. That keeps the composite primary key columns +NOT NULL and lets every read query use the same ``expiration_date > NOW()`` +shape without an ``IS NULL`` branch. + +The Python side uses a ``datetime`` subclass so we can: + + * register a psycopg2 adapter that intercepts just this sentinel and + emits ``'infinity'::timestamptz`` on the wire; regular ``datetime`` + values keep the default adapter; and + * register a typecaster that decodes PG ``infinity::timestamptz`` back to + an ``_Infinity`` instance (rather than a plain ``datetime(9999, ...)``) + so subsequent ORM round-trips serialise it as PG ``infinity`` again. + Without the typecaster the ORM would track a regular datetime for the + PK and re-serialise it as a year-9999 literal, which is *not* equal to + PG ``infinity`` at the SQL level and would cause UPDATE WHERE PK to + miss the row (StaleDataError). + +Round-trip assumption: SQLAlchemy's ``DateTime`` result processor for the +psycopg2 dialect is a passthrough, so the ``_Infinity`` subclass produced +by our typecaster reaches the ORM unchanged and the outbound adapter sees +``_Infinity`` (not a normalised ``datetime``) on the UPDATE path. If a +future SQLAlchemy version starts coercing timestamptz values during result +processing, the symptom would be loud (StaleDataError on bucket UPDATE), +not silent corruption; the fallback would be to expose this sentinel via a +custom ``TypeDecorator`` that coerces year-9999/us-999999 values back to +``_Infinity`` on load. +""" + +import datetime as dt + +import psycopg2.extensions + +_TIMESTAMPTZ_OID = 1184 # pg_type.oid for timestamptz + + +class _Infinity(dt.datetime): + """``datetime`` subclass whose only canonical instance is ``INFINITY``. + + Subclassing ``datetime`` lets the value participate in normal comparison + and arithmetic with other datetimes (it sorts after every representable + timestamptz), while remaining a distinct type for adapter dispatch. + """ + + __slots__ = () + + def __new__(cls) -> "_Infinity": + # Matches what psycopg2 returns when decoding PG ``infinity::timestamptz`` + # so reads from the DB compare equal to this sentinel via datetime equality. + return dt.datetime.__new__( + cls, 9999, 12, 31, 23, 59, 59, 999999, tzinfo=dt.timezone.utc + ) + + def __repr__(self) -> str: + return "INFINITY" + + +INFINITY: _Infinity = _Infinity() + + +def _adapt_infinity(_value: _Infinity) -> psycopg2.extensions.AsIs: + return psycopg2.extensions.AsIs("'infinity'::timestamptz") + + +_DEFAULT_TIMESTAMPTZ = psycopg2.extensions.PYDATETIMETZ + + +def _cast_timestamptz(value, cur): + """Replacement typecaster for OID 1184 that decodes PG ``infinity`` to ``INFINITY``.""" + if value == "infinity": + return INFINITY + return _DEFAULT_TIMESTAMPTZ(value, cur) + + +_INFINITY_AWARE_TIMESTAMPTZ = psycopg2.extensions.new_type( + (_TIMESTAMPTZ_OID,), + "TIMESTAMPTZ_INFINITY", + _cast_timestamptz, +) + + +_registered = False + + +def register_infinity_adapter() -> None: + """Register psycopg2 to round-trip ``INFINITY`` <-> PG ``infinity::timestamptz``. + + * Outbound: ``_Infinity`` instances serialise to ``'infinity'::timestamptz``. + * Inbound: ``timestamptz infinity`` values decode to ``INFINITY`` (the + ``_Infinity`` singleton), so the ORM tracks a value that re-serialises + as PG ``infinity`` on subsequent UPDATE statements. + + Idempotent. Safe to call at module-import time before any connections + are opened; adapters and typecasters are global to the psycopg2 module. + """ + global _registered + if _registered: + return + psycopg2.extensions.register_adapter(_Infinity, _adapt_infinity) + psycopg2.extensions.register_type(_INFINITY_AWARE_TIMESTAMPTZ) + _registered = True diff --git a/tests/db/test_credit_balances.py b/tests/db/test_credit_balances.py index 8cbffe840..97582a0b1 100644 --- a/tests/db/test_credit_balances.py +++ b/tests/db/test_credit_balances.py @@ -3,7 +3,7 @@ from decimal import Decimal from typing import Any, Dict, List -from sqlalchemy import select +from sqlalchemy import delete, select from sqlalchemy import update as sql_update from aleph.db.accessors.balances import ( @@ -618,19 +618,24 @@ def test_balance_fix_doesnt_affect_valid_credits(session_factory: DbSessionFacto assert balance > 0 -def test_fifo_scenario_1_non_expiring_first_equals_0_remaining( +def test_fifo_scenario_1_non_expiring_first_equals_500_remaining( session_factory: DbSessionFactory, ): """ - FIFO Scenario 1: Non-expiring credits received FIRST → Result: 0 remaining + Sort-by-expiration Scenario 1: non-expiring grant FIRST, expiring grant SECOND. Setup: - 1000 non-expiring credits (received FIRST at T1) - 1000 expiring credits (received SECOND at T2, expire at T4) - 1500 expense at T3 (before expiration at T4) - FIFO Consumption: 1000 (non-expiring) + 500 (expiring) = 1500 total consumed - Final Balance: 0 (non-expiring remaining) + 500 (expiring remaining but expired) = 0 + Sort-by-expiration consumption: the expiring bucket has the earlier + expiration_date and is drained first (1000), then 500 from the non-expiring + bucket. After expiration the expiring bucket is dropped by the cron, so the + final balance is the 500 surviving in the non-expiring bucket. + + Under the previous sort-by-issuance FIFO this test asserted 0; the policy + change makes the user lose fewer credits to expiration. """ # Use fixed timestamps before the credit precision cutoff (2026-02-02) @@ -721,8 +726,9 @@ def test_fifo_scenario_1_non_expiring_first_equals_0_remaining( session, "0xcorner_case_user", now_time ) - # Expected: 0 remaining (all non-expiring consumed, expiring remainder expired) - expected_balance = 0 + # Expected: 500 * 10000 remaining in the non-expiring bucket, because + # the expiring bucket was drained first under sort-by-expiration. + expected_balance = 500 * 10000 assert ( balance_after_expiration == expected_balance ), f"Scenario 1: Expected {expected_balance} remaining credits, got {balance_after_expiration}" @@ -838,97 +844,39 @@ def test_fifo_scenario_2_expiring_first_equals_500_remaining( ), f"Scenario 2: Expected {expected_balance} remaining credits, got {balance_after_expiration}" -def test_cache_invalidation_on_credit_expiration(session_factory: DbSessionFactory): +def test_expired_credits_filtered_out_by_read(session_factory: DbSessionFactory): + """A bucket whose ``expiration_date`` is in the past is not counted by + ``get_credit_balance``. This replaces the prior cache-invalidation test: + there is no lazy recompute; the read query just filters by + ``expiration_date > now()``. """ - Test that cached balances are recalculated when credits expire. - - This test covers the bug where cached balances were not being recalculated - when credits expired after the cache was last updated. - - Bug scenario: - 1. Credit with expiration date X is added at time T1 - 2. Cache is calculated at time T2 (where T1 < T2 < X) - 3. Current time is T3 (where T2 < X < T3, so credit has expired) - 4. Without the fix, cached balance would be returned incorrectly - 5. With the fix, balance is recalculated because credit expired after cache update - """ - - # Use fixed timestamps before the credit precision cutoff (2026-02-02) - # to ensure the 10000x multiplier is applied consistently - # Base time (T3): 2023-06-15 12:00:00 UTC - base_time = 1686830400 - - # Time T1: Add credit (1 hour before base_time) + base_time = 1686830400 # 2023-06-15 12:00:00 UTC (pre precision cutoff) credit_time = dt.datetime.fromtimestamp(base_time - 3600, tz=dt.timezone.utc) - - # Time T2: Cache calculation time (30 minutes before base_time, before expiration) - cache_time = dt.datetime.fromtimestamp(base_time - 1800, tz=dt.timezone.utc) - - # Time X: Credit expiration (5 minutes before base_time, between cache time and T3) - expiration_time = int((base_time - 300) * 1000) - - # Time T3: Current time for final balance check (after expiration) + expiration_time_ms = int((base_time - 300) * 1000) # expired 5 minutes before now now_time = dt.datetime.fromtimestamp(base_time, tz=dt.timezone.utc) with session_factory() as session: - # Step 1: Add credit with expiration date - credits_list = [ - { - "address": "0xcache_bug_user", - "amount": 1000, - "price": "1.0", - "tx_hash": "0xcache_test", - "provider": "test_provider", - "expiration": expiration_time, # Will expire at T3 - } - ] - update_credit_balances_distribution( session=session, - credits_list=credits_list, + credits_list=[ + { + "address": "0xexpired_user", + "amount": 1000, + "price": "1.0", + "tx_hash": "0xexpired", + "provider": "test_provider", + "expiration": expiration_time_ms, + } + ], token="TEST", chain="ETH", - message_hash="cache_expiration_test", - message_timestamp=credit_time, # T1 - ) - session.commit() - - # Step 2: Simulate cache being calculated at T2 (before expiration) - # Mock utc_now to return cache_time during first balance calculation - balance_before_expiration = get_credit_balance( - session, "0xcache_bug_user", cache_time + message_hash="expired_credits_test", + message_timestamp=credit_time, ) session.commit() - # Verify that at T2, the balance was 10000000 (1000 * 10000 multiplier, credit not yet expired) - assert balance_before_expiration == 10000000 - - # Verify that a cache entry was created and manually update its timestamp - # to simulate it being created at T2 (cache_time) - - cached_balance = session.execute( - select(AlephCreditBalanceDb).where( - AlephCreditBalanceDb.address == "0xcache_bug_user" - ) - ).scalar_one_or_none() - - assert cached_balance is not None - assert cached_balance.balance == 10000000 - assert cached_balance.last_update == cache_time - - # Step 3: Now check balance at current time (T3, after expiration) - # The fix should detect that credit expired after cache update and recalculate - balance_after_expiration = get_credit_balance( - session, "0xcache_bug_user", now_time - ) - - # Expected: 0 (credit has expired) - assert balance_after_expiration == 0 - - # Verify that cache was updated (should have a newer timestamp) - session.refresh(cached_balance) - assert cached_balance.balance == 0 - assert cached_balance.last_update == now_time + # Bucket row exists with the past expiration, but the read filters it out. + assert get_credit_balance(session, "0xexpired_user", now_time) == 0 def test_get_resource_consumed_credits_no_records(session_factory: DbSessionFactory): @@ -1550,9 +1498,35 @@ def test_chain_transfer_a_b_c_expiration_and_balances( def _insert_credit_history_entries(session, entries: List[Dict[str, Any]]): - """Helper to bulk-insert credit history rows for testing.""" + """Helper to bulk-insert credit history rows for testing. + + Also seeds the bucket cache so ``get_credit_balance(_with_details)`` see + the credits without going through the message writers. Positive amounts + add to a bucket at the entry's expiration_date; negative amounts drain + soonest-first from existing buckets, matching the writer semantics. + """ + from aleph.db.accessors.balances import ( + _apply_grant_bucket, + _consume_address_credits, + ) + for entry in entries: session.add(AlephCreditHistoryDb(**entry)) + amount = int(entry["amount"]) + if amount > 0: + _apply_grant_bucket( + session, entry["address"], amount, entry.get("expiration_date") + ) + else: + # Use the entry's message_timestamp as the validity cutoff so + # backdated test scenarios drain buckets that were valid at the + # historical expense time (matching the writer's eager-write path). + _consume_address_credits( + session, + entry["address"], + -amount, + now=entry.get("message_timestamp"), + ) session.flush() @@ -1649,12 +1623,18 @@ def test_credit_balance_details_mixed_expiration(session_factory: DbSessionFacto def test_credit_balance_details_partial_consumption(session_factory: DbSessionFactory): - """Details are accurate after FIFO partial consumption.""" + """Details are accurate after partial consumption under sort-by-expiration. + + Setup: 1000 non-expiring + 500 expiring at exp1, expense 700. + Sort-by-expiration drains the expiring (soonest) bucket first: 500 from + exp1, then 200 from non-expiring. Only the non-expiring bucket has a + positive amount left, so ``details`` is a single entry. + """ ts = dt.datetime(2026, 3, 1, tzinfo=dt.timezone.utc) exp1 = dt.datetime(2026, 9, 1, tzinfo=dt.timezone.utc) entries = [ - # Non-expiring credit (oldest, consumed first by FIFO) + # Non-expiring credit { "address": "0xdetails3", "amount": 1000, @@ -1664,7 +1644,7 @@ def test_credit_balance_details_partial_consumption(session_factory: DbSessionFa "last_update": ts, "expiration_date": None, }, - # Expiring credit + # Expiring credit (will be drained first under sort-by-expiration) { "address": "0xdetails3", "amount": 500, @@ -1674,7 +1654,7 @@ def test_credit_balance_details_partial_consumption(session_factory: DbSessionFa "last_update": ts, "expiration_date": exp1, }, - # Expense consuming 700 from the oldest (non-expiring) credit + # Expense consuming 700 (drains 500 from exp1 then 200 from non-expiring) { "address": "0xdetails3", "amount": -700, @@ -1694,13 +1674,12 @@ def test_credit_balance_details_partial_consumption(session_factory: DbSessionFa total, details = get_credit_balance_with_details( session=session, address="0xdetails3" ) - # 1000 - 700 = 300 non-expiring remaining, 500 expiring remaining + # Soonest-first drain: exp1 -> 0, non-expiring -> 800. Only the + # non-expiring bucket has a positive remainder. assert total == 800 - assert len(details) == 2 + assert len(details) == 1 assert details[0].expiration_date is None - assert details[0].amount == 300 - assert details[1].expiration_date == exp1 - assert details[1].amount == 500 + assert details[0].amount == 800 def test_credit_balance_details_fully_consumed(session_factory: DbSessionFactory): @@ -1807,7 +1786,10 @@ def test_credit_balance_details_no_history(session_factory: DbSessionFactory): def test_credit_balance_details_matches_total(session_factory: DbSessionFactory): - """Sum of details amounts equals the total balance.""" + """Sum of details amounts equals the total balance, with the + sort-by-expiration policy producing a different per-bucket breakdown + than the legacy sort-by-issuance one (same total). + """ ts = dt.datetime(2026, 3, 1, tzinfo=dt.timezone.utc) exp1 = dt.datetime(2026, 9, 1, tzinfo=dt.timezone.utc) exp2 = dt.datetime(2026, 12, 1, tzinfo=dt.timezone.utc) @@ -1840,7 +1822,8 @@ def test_credit_balance_details_matches_total(session_factory: DbSessionFactory) "last_update": ts, "expiration_date": exp2, }, - # Expense consuming 1200 total (FIFO: 1000 from non-expiring, 200 from exp1) + # Expense consuming 1200. Sort-by-expiration drains exp1 (800) then + # exp2 (400); non-expiring (sentinel) is untouched. { "address": "0xdetails6", "amount": -1200, @@ -1860,16 +1843,18 @@ def test_credit_balance_details_matches_total(session_factory: DbSessionFactory) total, details = get_credit_balance_with_details( session=session, address="0xdetails6" ) - # 1000 - 1000 = 0 non-expiring, 800 - 200 = 600 exp1, 600 exp2 + # 1000 non-expiring + 0 exp1 + 200 exp2 = 1200 assert total == 1200 details_sum = sum(d.amount for d in details) assert details_sum == total + # Details order: non-expiring first (None), then by expiration ASC. + # exp1 is drained to 0 so it's absent from the positive-amount list. assert len(details) == 2 - assert details[0].expiration_date == exp1 - assert details[0].amount == 600 + assert details[0].expiration_date is None + assert details[0].amount == 1000 assert details[1].expiration_date == exp2 - assert details[1].amount == 600 + assert details[1].amount == 200 # ── Volume (origin_ref) consumed credits tests ─────────────────────── @@ -2488,3 +2473,461 @@ def test_cursor_pagination_nullable_sort_nulls_on_last_page( ) assert len(all_refs) == 5 assert len(set(all_refs)) == 5 + + +# --------------------------------------------------------------------------- +# Bucket-cache eager-update behaviour +# --------------------------------------------------------------------------- + +from aleph.db.accessors.balances import ( # noqa: E402 + _apply_grant_bucket, + _compute_transfer_entries_by_expiration, + _consume_address_credits, + count_credit_balances, + get_credit_balances, +) +from aleph.toolkit.infinity import INFINITY # noqa: E402 + + +def _bucket_amount(session, address, expiration_date) -> int: + row = session.execute( + select(AlephCreditBalanceDb).where( + AlephCreditBalanceDb.address == address, + AlephCreditBalanceDb.expiration_date == expiration_date, + ) + ).scalar_one_or_none() + return 0 if row is None else int(row.amount) + + +def test_apply_grant_bucket_inserts_and_increments( + session_factory: DbSessionFactory, +): + """Grants insert a new bucket and increment an existing one (same expiration).""" + exp = dt.datetime(2050, 1, 1, tzinfo=dt.timezone.utc) + with session_factory() as session: + _apply_grant_bucket(session, "0xbucket1", 100, exp) + session.commit() + assert _bucket_amount(session, "0xbucket1", exp) == 100 + + _apply_grant_bucket(session, "0xbucket1", 50, exp) + session.commit() + assert _bucket_amount(session, "0xbucket1", exp) == 150 + + +def test_apply_grant_bucket_none_expiration_uses_sentinel( + session_factory: DbSessionFactory, +): + """A None expiration maps to the INFINITY sentinel bucket.""" + with session_factory() as session: + _apply_grant_bucket(session, "0xnoexp", 200, None) + session.commit() + assert _bucket_amount(session, "0xnoexp", INFINITY) == 200 + + +def test_consume_address_credits_drains_soonest_first( + session_factory: DbSessionFactory, +): + """Consumption drains the soonest-expiring non-expired bucket first.""" + exp_soon = dt.datetime(2050, 1, 1, tzinfo=dt.timezone.utc) + exp_late = dt.datetime(2099, 1, 1, tzinfo=dt.timezone.utc) + with session_factory() as session: + _apply_grant_bucket(session, "0xsoonest", 100, exp_late) + _apply_grant_bucket(session, "0xsoonest", 100, exp_soon) + session.commit() + + consumed = _consume_address_credits(session, "0xsoonest", 150) + session.commit() + + # 100 from soon, 50 from late. + assert consumed == [(100, exp_soon), (50, exp_late)] + assert _bucket_amount(session, "0xsoonest", exp_soon) == 0 + assert _bucket_amount(session, "0xsoonest", exp_late) == 50 + + +def test_consume_address_credits_skips_expired_buckets( + session_factory: DbSessionFactory, +): + """A bucket whose expiration has already passed is not drained.""" + past = dt.datetime(2000, 1, 1, tzinfo=dt.timezone.utc) + future = dt.datetime(2099, 1, 1, tzinfo=dt.timezone.utc) + with session_factory() as session: + _apply_grant_bucket(session, "0xskipexpired", 100, past) + _apply_grant_bucket(session, "0xskipexpired", 100, future) + session.commit() + + consumed = _consume_address_credits(session, "0xskipexpired", 100) + session.commit() + + assert consumed == [(100, future)] + assert _bucket_amount(session, "0xskipexpired", past) == 100 + assert _bucket_amount(session, "0xskipexpired", future) == 0 + + +def test_consume_address_credits_underfunded_returns_partial( + session_factory: DbSessionFactory, +): + """Consuming more than available drains everything available and stops at zero.""" + exp = dt.datetime(2099, 1, 1, tzinfo=dt.timezone.utc) + with session_factory() as session: + _apply_grant_bucket(session, "0xshort", 75, exp) + session.commit() + + consumed = _consume_address_credits(session, "0xshort", 200) + session.commit() + + assert consumed == [(75, exp)] + assert _bucket_amount(session, "0xshort", exp) == 0 + + +def test_get_credit_balance_sums_non_expired_buckets( + session_factory: DbSessionFactory, +): + past = dt.datetime(2000, 1, 1, tzinfo=dt.timezone.utc) + future = dt.datetime(2099, 1, 1, tzinfo=dt.timezone.utc) + with session_factory() as session: + _apply_grant_bucket(session, "0xsum", 100, past) + _apply_grant_bucket(session, "0xsum", 250, future) + _apply_grant_bucket(session, "0xsum", 50, None) + session.commit() + assert get_credit_balance(session, "0xsum") == 300 + + +def test_get_credit_balance_floors_at_zero(session_factory: DbSessionFactory): + """Negative bucket amounts (defensive) are floored at zero on read.""" + with session_factory() as session: + session.add( + AlephCreditBalanceDb( + address="0xneg", + expiration_date=INFINITY, + amount=-10, + ) + ) + session.commit() + assert get_credit_balance(session, "0xneg") == 0 + + +def test_get_credit_balance_with_details_groups_by_expiration( + session_factory: DbSessionFactory, +): + """``get_credit_balance_with_details`` returns one entry per bucket with + the no-expiration sentinel mapped back to None and sorted first. + """ + exp_a = dt.datetime(2050, 1, 1, tzinfo=dt.timezone.utc) + exp_b = dt.datetime(2099, 1, 1, tzinfo=dt.timezone.utc) + with session_factory() as session: + _apply_grant_bucket(session, "0xdetails", 100, exp_b) + _apply_grant_bucket(session, "0xdetails", 50, None) + _apply_grant_bucket(session, "0xdetails", 25, exp_a) + session.commit() + + total, details = get_credit_balance_with_details(session, "0xdetails") + assert total == 175 + # No-expiration first, then earliest expiration next. + assert [d.expiration_date for d in details] == [None, exp_a, exp_b] + assert [d.amount for d in details] == [50, 25, 100] + + +def test_compute_transfer_entries_caps_at_source_and_requested(): + """``min(source, requested)`` is applied per consumed slice; same effective + expiration entries are merged.""" + source_exp = dt.datetime(2099, 1, 1, tzinfo=dt.timezone.utc) + later_requested = dt.datetime(2200, 1, 1, tzinfo=dt.timezone.utc) + earlier_requested = dt.datetime(2050, 1, 1, tzinfo=dt.timezone.utc) + + # Requested later than source: source wins. + assert _compute_transfer_entries_by_expiration( + [(100, source_exp)], later_requested + ) == [(100, source_exp)] + + # Requested earlier than source: requested wins. + assert _compute_transfer_entries_by_expiration( + [(100, source_exp)], earlier_requested + ) == [(100, earlier_requested)] + + # Source is the sentinel (no expiration): requested propagates. + assert _compute_transfer_entries_by_expiration( + [(100, INFINITY)], later_requested + ) == [(100, later_requested)] + + # Source is the sentinel and no requested: recipient gets no expiration. + assert _compute_transfer_entries_by_expiration([(100, INFINITY)], None) == [ + (100, None) + ] + + # Two consumed slices with the same effective expiration merge. + assert _compute_transfer_entries_by_expiration( + [(50, source_exp), (50, INFINITY)], earlier_requested + ) == [(100, earlier_requested)] + + +def test_distribution_creates_buckets_with_correct_expiration( + session_factory: DbSessionFactory, +): + """Distribution rows produce buckets at exactly the message expiration_date.""" + ts = dt.datetime(2023, 1, 1, 12, 0, 0, tzinfo=dt.timezone.utc) + exp_dt = dt.datetime(2200, 1, 1, tzinfo=dt.timezone.utc) + exp_ms = int(exp_dt.timestamp() * 1000) + + with session_factory() as session: + update_credit_balances_distribution( + session=session, + credits_list=[ + { + "address": "0xnewdist", + "amount": 42, + "price": "0.1", + "tx_hash": "0xt", + "provider": "p", + "expiration": exp_ms, + } + ], + token="ALEPH", + chain="ETH", + message_hash="msg_new_dist", + message_timestamp=ts, + ) + session.commit() + + # Bucket exists at exactly the expected expiration (not the sentinel). + assert _bucket_amount(session, "0xnewdist", exp_dt) == 42 * 10000 + assert _bucket_amount(session, "0xnewdist", INFINITY) == 0 + + +def test_expense_drains_soonest_bucket_first(session_factory: DbSessionFactory): + """An expense first drains the soonest-expiring bucket. The history row + still records the full intended amount even if buckets are under-drained. + """ + dist1 = dt.datetime(2023, 1, 1, 12, 0, 0, tzinfo=dt.timezone.utc) + dist2 = dt.datetime(2023, 1, 2, 12, 0, 0, tzinfo=dt.timezone.utc) + exp_dt = dt.datetime(2200, 1, 1, tzinfo=dt.timezone.utc) + exp_ms = int(exp_dt.timestamp() * 1000) + expense_ts = dt.datetime(2023, 1, 3, 12, 0, 0, tzinfo=dt.timezone.utc) + + with session_factory() as session: + # Bucket A: 100 expiring at exp_dt (issued FIRST). + update_credit_balances_distribution( + session=session, + credits_list=[ + { + "address": "0xexpense", + "amount": 100, + "price": "0.1", + "tx_hash": "0xa", + "provider": "p", + "expiration": exp_ms, + } + ], + token="ALEPH", + chain="ETH", + message_hash="dist_a", + message_timestamp=dist1, + ) + # Bucket B: 200 non-expiring (issued SECOND). + update_credit_balances_distribution( + session=session, + credits_list=[ + { + "address": "0xexpense", + "amount": 200, + "price": "0.1", + "tx_hash": "0xb", + "provider": "p", + } + ], + token="ALEPH", + chain="ETH", + message_hash="dist_b", + message_timestamp=dist2, + ) + # Expense 150: soonest-first drains A entirely (100) then 50 from B. + update_credit_balances_expense( + session=session, + credits_list=[{"address": "0xexpense", "amount": 150, "ref": "r"}], + message_hash="exp_msg", + message_timestamp=expense_ts, + ) + session.commit() + + assert _bucket_amount(session, "0xexpense", exp_dt) == 0 + assert _bucket_amount(session, "0xexpense", INFINITY) == 150 * 10000 + # The history row carries the full intent. + record = session.execute( + select(AlephCreditHistoryDb).where( + AlephCreditHistoryDb.credit_ref == "exp_msg" + ) + ).scalar_one() + assert record.amount == -150 * 10000 + + +def test_transfer_recipient_inherits_capped_expiration( + session_factory: DbSessionFactory, +): + """Sender's soonest-expiring bucket is drained first; recipient inherits + min(source_exp, requested_exp). + """ + dist_ts = dt.datetime(2023, 1, 1, 12, 0, 0, tzinfo=dt.timezone.utc) + xfer_ts = dt.datetime(2023, 2, 1, 12, 0, 0, tzinfo=dt.timezone.utc) + soon = dt.datetime(2050, 1, 1, tzinfo=dt.timezone.utc) + soon_ms = int(soon.timestamp() * 1000) + requested = dt.datetime(2200, 1, 1, tzinfo=dt.timezone.utc) + requested_ms = int(requested.timestamp() * 1000) + + with session_factory() as session: + update_credit_balances_distribution( + session=session, + credits_list=[ + { + "address": "0xsender", + "amount": 100, + "price": "0.1", + "tx_hash": "0xt", + "provider": "p", + "expiration": soon_ms, + } + ], + token="ALEPH", + chain="ETH", + message_hash="dist_sender", + message_timestamp=dist_ts, + ) + update_credit_balances_transfer( + session=session, + credits_list=[ + {"address": "0xrecipient", "amount": 50, "expiration": requested_ms} + ], + sender_address="0xsender", + whitelisted_addresses=[], + message_hash="xfer", + message_timestamp=xfer_ts, + ) + session.commit() + + # Sender drained from the soon bucket. + assert _bucket_amount(session, "0xsender", soon) == 50 * 10000 + # Recipient bucket capped at source's earlier expiration. + assert _bucket_amount(session, "0xrecipient", soon) == 50 * 10000 + assert _bucket_amount(session, "0xrecipient", requested) == 0 + + +def test_count_and_list_credit_balances(session_factory: DbSessionFactory): + """Multi-address aggregation: ``get_credit_balances`` and + ``count_credit_balances`` aggregate per-address sums of non-expired buckets. + """ + past = dt.datetime(2000, 1, 1, tzinfo=dt.timezone.utc) + future = dt.datetime(2099, 1, 1, tzinfo=dt.timezone.utc) + with session_factory() as session: + _apply_grant_bucket(session, "0xa", 100, future) + _apply_grant_bucket(session, "0xb", 50, future) + _apply_grant_bucket(session, "0xc", 999, past) # expired + _apply_grant_bucket(session, "0xd", 0, future) # zero, no positive sum + session.commit() + + rows = get_credit_balances(session) + addresses = {address for address, _ in rows} + # 0xc has only expired credits, 0xd has zero — both excluded. + assert addresses == {"0xa", "0xb"} + assert dict(rows) == {"0xa": 100, "0xb": 50} + + assert count_credit_balances(session) == 2 + assert count_credit_balances(session, min_balance=75) == 1 + + +def test_repair_rebuilds_buckets_from_history(session_factory: DbSessionFactory): + """The repair function reproduces buckets from credit_history alone. + Simulates a fresh table by deleting all buckets then running repair. + """ + from aleph.repair import _repair_credit_balances + + dist1 = dt.datetime(2023, 1, 1, 12, 0, 0, tzinfo=dt.timezone.utc) + dist2 = dt.datetime(2023, 1, 2, 12, 0, 0, tzinfo=dt.timezone.utc) + expense_ts = dt.datetime(2023, 1, 3, 12, 0, 0, tzinfo=dt.timezone.utc) + exp_dt = dt.datetime(2200, 1, 1, tzinfo=dt.timezone.utc) + exp_ms = int(exp_dt.timestamp() * 1000) + + with session_factory() as session: + update_credit_balances_distribution( + session=session, + credits_list=[ + { + "address": "0xrepair", + "amount": 100, + "price": "0.1", + "tx_hash": "0xt", + "provider": "p", + "expiration": exp_ms, + } + ], + token="ALEPH", + chain="ETH", + message_hash="repair_dist_a", + message_timestamp=dist1, + ) + update_credit_balances_distribution( + session=session, + credits_list=[ + { + "address": "0xrepair", + "amount": 200, + "price": "0.1", + "tx_hash": "0xt", + "provider": "p", + } + ], + token="ALEPH", + chain="ETH", + message_hash="repair_dist_b", + message_timestamp=dist2, + ) + update_credit_balances_expense( + session=session, + credits_list=[{"address": "0xrepair", "amount": 150, "ref": "r"}], + message_hash="repair_exp", + message_timestamp=expense_ts, + ) + session.commit() + + # Capture the expected post-eager state. + expected_bucket_amount = _bucket_amount(session, "0xrepair", INFINITY) + assert expected_bucket_amount == 150 * 10000 + + # Wipe the cache, simulating a freshly migrated DB. + session.execute(delete(AlephCreditBalanceDb)) + session.commit() + + _repair_credit_balances(session_factory) + + with session_factory() as session: + assert _bucket_amount(session, "0xrepair", INFINITY) == expected_bucket_amount + # The expiring bucket was drained first and has no remaining amount, + # so the repair does not write a row for it. + assert _bucket_amount(session, "0xrepair", exp_dt) == 0 + + +def test_repair_is_idempotent(session_factory: DbSessionFactory): + from aleph.repair import _repair_credit_balances + + ts = dt.datetime(2023, 1, 1, 12, 0, 0, tzinfo=dt.timezone.utc) + with session_factory() as session: + update_credit_balances_distribution( + session=session, + credits_list=[ + { + "address": "0xidemp", + "amount": 42, + "price": "0.1", + "tx_hash": "0xt", + "provider": "p", + } + ], + token="ALEPH", + chain="ETH", + message_hash="idemp_dist", + message_timestamp=ts, + ) + session.commit() + + _repair_credit_balances(session_factory) + _repair_credit_balances(session_factory) + + with session_factory() as session: + assert _bucket_amount(session, "0xidemp", INFINITY) == 42 * 10000 diff --git a/tests/message_processing/test_process_stores.py b/tests/message_processing/test_process_stores.py index cbe692c67..09633d78a 100644 --- a/tests/message_processing/test_process_stores.py +++ b/tests/message_processing/test_process_stores.py @@ -19,6 +19,7 @@ from aleph.db.accessors.messages import get_message_by_item_hash from aleph.db.models import ( AlephBalanceDb, + AlephCreditBalanceDb, AlephCreditHistoryDb, MessageDb, MessageStatusDb, @@ -905,16 +906,27 @@ async def test_new_store_message_with_sufficient_credits( ) message.parsed_content = content - # Add sufficient credit balance via credit history (required by get_credit_balance) + # Seed the eager bucket cache so get_credit_balance sees the credits. + # (The credit_history row mirrors what the writer would have stored; + # production goes through update_credit_balances_distribution.) + from aleph.toolkit.infinity import INFINITY + session.add( AlephCreditHistoryDb( address=address, - amount=1000000000, # Large enough to cover 1 day of storage + amount=1000000000, credit_ref="test-credit-ref", credit_index=0, message_timestamp=timestamp_to_datetime(CREDIT_ONLY_CUTOFF_TIMESTAMP), ) ) + session.add( + AlephCreditBalanceDb( + address=address, + expiration_date=INFINITY, + amount=1000000000, + ) + ) session.commit() # Should pass the balance check @@ -1298,11 +1310,13 @@ async def test_legacy_store_with_credit_payment_and_credits( ) message.parsed_content = content - # Add sufficient credit balance via credit history (required by get_credit_balance) + # Seed the eager bucket cache so get_credit_balance sees the credits. + from aleph.toolkit.infinity import INFINITY + session.add( AlephCreditHistoryDb( address=address, - amount=1000000000, # Large enough to cover 1 day of storage + amount=1000000000, credit_ref="test-credit-ref", credit_index=0, message_timestamp=timestamp_to_datetime( @@ -1310,6 +1324,13 @@ async def test_legacy_store_with_credit_payment_and_credits( ), ) ) + session.add( + AlephCreditBalanceDb( + address=address, + expiration_date=INFINITY, + amount=1000000000, + ) + ) session.commit() # Should pass the balance check