Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
9652d46
Hashes Storage Optimizations
Ouziel May 12, 2026
58c61ea
Ruff
Ouziel May 12, 2026
673287d
fix unit test
Ouziel May 12, 2026
58eacc6
fix tests
Ouziel May 12, 2026
316a5b7
fix testnet4 tests
Ouziel May 12, 2026
e60066a
more test fix
Ouziel May 12, 2026
6e94d14
Optimize API hot path: cache schema probe, drop redundant hex_lower U…
Ouziel May 13, 2026
6972f93
Optimize API COUNT(*) hot path: skip redundant LEFT JOINs in count qu…
Ouziel May 13, 2026
bff8da9
ruff
Ouziel May 13, 2026
3ef9828
Fix slow messages queries: ensure indexes exist after compact-hash mi…
Ouziel May 13, 2026
9d79b31
lint
Ouziel May 13, 2026
24431ea
fixes and more tests
Ouziel May 14, 2026
903caa3
ruff
Ouziel May 14, 2026
d4d3c55
VACUUM after hashes compaction
Ouziel May 14, 2026
6dfe776
lint
Ouziel May 14, 2026
c6da542
fixes
Ouziel May 14, 2026
5c24616
lint and fix boolean in API
Ouziel May 14, 2026
3ff03a9
update release notes
Ouziel May 14, 2026
d963ea4
fix unit tests
Ouziel May 14, 2026
e5c9c00
comment false positive
Ouziel May 14, 2026
a99bb4f
Merge branch 'develop' into hashes
Ouziel May 27, 2026
ff88be0
fix unit tests
Ouziel May 27, 2026
91add26
fix race condition in regtest test
Ouziel May 28, 2026
05549b5
Merge branch 'develop' into hashes
Ouziel Jun 15, 2026
08a61e1
fix(tests): align merged develop tests with compact-hash storage schema
Ouziel Jun 15, 2026
1243b1d
refactor(ledger): replace composite match-IDs with (tx0_index, tx1_in…
Ouziel Jun 15, 2026
5d348d7
fix tests
Ouziel Jun 16, 2026
dfdc441
feat(ledger): normalize asset names to a compact asset_index FK
Ouziel Jun 16, 2026
dd0a4ae
fix bootstrap test
Ouziel Jun 17, 2026
e1f39a6
property test diagnose
Ouziel Jun 17, 2026
8232779
fix(ledger): reset asset_index cache after rollback
Ouziel Jun 17, 2026
d27112f
feat(ledger): normalize addresses to address_id FK and compact utxo c…
Ouziel Jun 17, 2026
38bbc37
fix ruff and unit tests
Ouziel Jun 17, 2026
67e2706
fix tests
Ouziel Jun 17, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 77 additions & 16 deletions counterparty-core/counterpartycore/lib/api/apiwatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from counterpartycore.lib import config, exceptions
from counterpartycore.lib.api import dbbuilder
from counterpartycore.lib.parser import utxosinfo
from counterpartycore.lib.utils import database
from counterpartycore.lib.utils import database, hashcodec
from counterpartycore.lib.utils.helpers import format_duration

logger = logging.getLogger(config.LOGGER_NAME)
Expand Down Expand Up @@ -161,9 +161,63 @@ def get_event_bindings(event):
return event_bindings


def insert_event_to_sql(event):
# Tables in ``STATE_DB_TABLES`` whose schema (mirrored from ``ledger_db``)
# replaced a legacy hex hash column with a ``*_tx_index`` integer FK. The
# event JSON still carries the legacy hash (consensus-stable), so the
# apiwatcher must resolve hex_tx_hash -> tx_index against ``ledger_db.
# transactions`` before binding into the state_db schema.
#
# This map mirrors ``ledger.events.HASH_TO_TX_INDEX_FK`` defensively: tables
# not in ``STATE_DB_TABLES`` today (cancels, dispenses, dispenser_refills,
# fairmints) are listed here so that adding them to ``STATE_DB_TABLES``
# later does not silently break replication.
_STATE_DB_HASH_FK_RESOLUTION = {
"cancels": {"offer_hash": "offer_tx_index"},
"dispenses": {"dispenser_tx_hash": "dispenser_tx_index"},
"dispenser_refills": {"dispenser_tx_hash": "dispenser_tx_index"},
"fairmints": {"fairminter_tx_hash": "fairminter_tx_index"},
"pool_matches": {"order_tx_hash": "order_tx_index"},
}


def _resolve_state_db_fk_columns(ledger_db, table, event_bindings):
"""For tables in ``_STATE_DB_HASH_FK_RESOLUTION``, replace legacy
``*_tx_hash`` keys with their resolved ``*_tx_index`` integer FK so the
INSERT/UPDATE targets the migrated state_db schema."""
fk_map = _STATE_DB_HASH_FK_RESOLUTION.get(table)
if not fk_map or ledger_db is None:
return
cursor = ledger_db.cursor()
for hash_key, index_key in fk_map.items():
if hash_key not in event_bindings:
continue
hex_value = event_bindings.pop(hash_key)
if hex_value is None:
event_bindings[index_key] = None
continue
blob = hashcodec.hash_to_db(hex_value)
row = cursor.execute(
"SELECT tx_index FROM transactions WHERE tx_hash = ?", (blob,)
).fetchone()
event_bindings[index_key] = row["tx_index"] if row else None
cursor.close()


def _normalize_event_bindings_for_state_db(event_bindings):
"""Convert any hash-typed values from hex-string (JSON form) to BLOB(32)
so they can be bound against state_db columns that mirror the BLOB
schema of ledger_db after the size-optimization migration."""
for key in list(event_bindings.keys()):
if key in hashcodec.HASH_COLUMN_NAMES:
event_bindings[key] = hashcodec.hash_to_db(event_bindings[key])
return event_bindings


def insert_event_to_sql(event, ledger_db=None):
event_bindings = get_event_bindings(event)
event_bindings["block_index"] = event["block_index"]
_resolve_state_db_fk_columns(ledger_db, event["category"], event_bindings)
_normalize_event_bindings_for_state_db(event_bindings)
sql_bindings = []
sql = f"INSERT INTO {event['category']} "
names = []
Expand All @@ -174,13 +228,16 @@ def insert_event_to_sql(event):
return sql, sql_bindings


def update_event_to_sql(event):
def update_event_to_sql(event, ledger_db=None):
event_bindings = get_event_bindings(event)
event_bindings["block_index"] = event["block_index"]

if event_bindings["block_index"] == config.MEMPOOL_BLOCK_INDEX:
return None, []

_resolve_state_db_fk_columns(ledger_db, event["category"], event_bindings)
_normalize_event_bindings_for_state_db(event_bindings)

id_field_names = UPDATE_EVENTS_ID_FIELDS[event["event"]]

sql_bindings = []
Expand All @@ -205,11 +262,11 @@ def update_event_to_sql(event):
return sql, sql_bindings


def event_to_sql(event):
def event_to_sql(event, ledger_db=None):
if event["command"] == "insert":
return insert_event_to_sql(event)
return insert_event_to_sql(event, ledger_db=ledger_db)
if event["command"] in ["update", "parse"]:
return update_event_to_sql(event)
return update_event_to_sql(event, ledger_db=ledger_db)
return None, []


Expand Down Expand Up @@ -498,15 +555,19 @@ def update_fairminters(state_db, event):
earned_quantity = COALESCE(earned_quantity, 0) + :earn_quantity,
commission = COALESCE(commission, 0) + :commission,
paid_quantity = COALESCE(paid_quantity, 0) + :paid_quantity
WHERE tx_hash = :fairminter_tx_hash
WHERE tx_hash = :fairminter_tx_hash_blob
"""
cursor.execute(sql, event_bindings)
bindings = dict(event_bindings)
bindings["fairminter_tx_hash_blob"] = hashcodec.hash_to_db(
event_bindings.get("fairminter_tx_hash")
)
cursor.execute(sql, bindings)


def update_consolidated_tables(state_db, event):
def update_consolidated_tables(state_db, event, ledger_db=None):
if event["category"] in STATE_DB_TABLES:
cursor = state_db.cursor()
sql, sql_bindings = event_to_sql(event)
sql, sql_bindings = event_to_sql(event, ledger_db=ledger_db)
if sql is not None:
cursor.execute(sql, sql_bindings)
# because no event for balance update
Expand All @@ -516,12 +577,12 @@ def update_consolidated_tables(state_db, event):
update_fairminters(state_db, event)


def update_state_db_tables(state_db, event):
def update_state_db_tables(state_db, event, ledger_db=None):
if event["event"] not in SKIP_EVENTS:
update_address_events(state_db, event)
update_all_expiration(state_db, event)
update_assets_info(state_db, event)
update_consolidated_tables(state_db, event)
update_consolidated_tables(state_db, event, ledger_db=ledger_db)


def update_last_parsed_events_cache(state_db, event=None):
Expand Down Expand Up @@ -586,10 +647,10 @@ def get_last_block_parsed(state_db, no_cache=False):
return 0


def parse_event(state_db, event):
def parse_event(state_db, event, ledger_db=None):
with state_db:
logger.trace(f"Parsing event: {event}")
update_state_db_tables(state_db, event)
update_state_db_tables(state_db, event, ledger_db=ledger_db)
update_last_parsed_events(state_db, event)
update_events_count(state_db, event)
update_transaction_types_count(state_db, event)
Expand All @@ -605,7 +666,7 @@ def catch_up(ledger_db, state_db, watcher=None):
event_parsed = 0
next_event = get_next_event_to_parse(ledger_db, state_db)
while next_event and (watcher is None or not watcher.stop_event.is_set()):
parse_event(state_db, next_event)
parse_event(state_db, next_event, ledger_db=ledger_db)
event_parsed += 1
if event_parsed % 50000 == 0:
duration = time.time() - start_time
Expand Down Expand Up @@ -672,7 +733,7 @@ def parse_next_event(ledger_db, state_db):
if next_event is None:
raise exceptions.NoEventToParse("No event to parse")

parse_event(state_db, next_event)
parse_event(state_db, next_event, ledger_db=ledger_db)


class APIWatcher(threading.Thread):
Expand Down
12 changes: 12 additions & 0 deletions counterparty-core/counterpartycore/lib/api/dbbuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,18 @@
logger = logging.getLogger(config.LOGGER_NAME)

MIGRATIONS_AFTER_ROLLBACK = [
# 0002 / 0003 are included so that pre-existing state DBs built before the
# compact-hash storage migration get rebuilt with the ``hex_lower(...)``
# projection on the next rollback:
# - 0002 populates ``parsed_events.event_hash`` (TEXT) from
# ``ledger_db.messages.event_hash`` (now BLOB(32)); without the
# ``hex_lower`` projection, BLOBs end up stored in the TEXT column.
# - 0003 has the same problem on ``all_expirations.object_id``.
# ``parsed_events`` and ``all_expirations`` are also in ``ROLLBACKABLE_TABLES``
# (DELETE-only path); the DELETE is harmless since the table is dropped
# and recreated by the migration apply.
"0002.create_and_populate_parsed_events",
"0003.create_and_populate_all_expirations",
"0004.create_and_populate_assets_info",
"0005.create_and_populate_events_count",
"0006.create_and_populate_consolidated_tables",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#
# file: counterpartycore/lib/api/migrations/0002.create_and_populate_parsed_events.py
#
import binascii
import logging
import time

Expand All @@ -17,13 +18,40 @@ def dict_factory(cursor, row):
return dict(zip(fields, row, strict=True))


def _hex_lower_udf(value):
"""SQLite UDF: BLOB/text -> 64-char lowercase hex string, NULL->NULL.

``ledger_db.messages.event_hash`` is BLOB(32) after the compact-hash
storage migration. ``parsed_events.event_hash`` is declared TEXT, and
SQLite does NOT implicitly coerce BLOB -> TEXT on INSERT, so we must
explicitly hex-encode the value to keep the on-disk shape consistent
with rows inserted at runtime via ``update_last_parsed_events`` (which
binds 64-char hex strings).
"""
if value is None:
return None
if isinstance(value, str):
return value.lower()
return binascii.hexlify(value).decode("ascii")


def apply(db):
start_time = time.time()
logger.debug("Populating the `parsed_events` table...")

if hasattr(db, "row_factory"):
db.row_factory = dict_factory

# Register hex_lower so the ``event_hash`` projection produces lowercase
# hex strings even though the underlying ledger column is BLOB(32). The
# apsw-registered ``hex_lower`` UDF on the main connection is not visible
# to the yoyo stdlib sqlite3 connection, so register a local shim.
try:
db.create_function("hex_lower", 1, _hex_lower_udf)
except AttributeError:
# apsw path: ``createscalarfunction`` instead of ``create_function``.
db.createscalarfunction("hex_lower", _hex_lower_udf, 1)

attached = (
db.execute(
"SELECT COUNT(*) AS count FROM pragma_database_list WHERE name = ?", ("ledger_db",)
Expand All @@ -42,9 +70,12 @@ def apply(db):
block_index INTEGER
);
""",
# ``ledger_db.messages.event_hash`` is BLOB(32); hex-encode it so
# the TEXT column stays consistent with rows produced at runtime
# (which bind hex strings via ``update_last_parsed_events``).
"""
INSERT INTO parsed_events (event_index, event, event_hash, block_index)
SELECT message_index AS event_index, event, event_hash, block_index
SELECT message_index AS event_index, event, hex_lower(event_hash) AS event_hash, block_index
FROM ledger_db.messages
""",
"""
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#
# file: counterpartycore/lib/api/migrations/0003.create_and_populate_all_expirations.py
#
import binascii
import logging
import time

Expand All @@ -17,13 +18,37 @@ def dict_factory(cursor, row):
return dict(zip(fields, row, strict=True))


def _hex_lower_udf(value):
"""SQLite UDF: BLOB/text -> 64-char lowercase hex string, NULL->NULL.

The state DB attaches the (already migrated) ledger DB and reads
``*_hash`` columns that are now BLOB(32). Yoyo connects via the stdlib
sqlite3 driver, so the apsw-registered ``hex_lower`` UDF on the main
state-DB connection is not visible here; we register the same shim on
the migration connection.
"""
if value is None:
return None
if isinstance(value, str):
return value.lower()
return binascii.hexlify(value).decode("ascii")


def apply(db):
start_time = time.time()
logger.debug("Populating the `all_expirations` table...")

if hasattr(db, "row_factory"):
db.row_factory = dict_factory

# Register hex_lower so the ``object_id`` projection produces lowercase
# hex strings even though the underlying columns are BLOB(32).
try:
db.create_function("hex_lower", 1, _hex_lower_udf)
except AttributeError:
# apsw path: ``createscalarfunction`` instead of ``create_function``.
db.createscalarfunction("hex_lower", _hex_lower_udf, 1)

attached = (
db.execute(
"SELECT COUNT(*) AS count FROM pragma_database_list WHERE name = ?", ("ledger_db",)
Expand All @@ -41,9 +66,13 @@ def apply(db):
block_index INTEGER
);
""",
# ``*_hash`` columns on the ledger DB are now BLOB(32); convert to
# lowercase hex via the ``hex_lower`` UDF so ``object_id`` stays a
# 64-char hex string (matching the runtime path that gets the value
# via ``json.loads(event["bindings"])``).
"""
INSERT INTO all_expirations (object_id, block_index, type)
SELECT order_hash AS object_id, block_index, 'order' AS type
SELECT hex_lower(order_hash) AS object_id, block_index, 'order' AS type
FROM ledger_db.order_expirations
""",
"""
Expand All @@ -53,7 +82,7 @@ def apply(db):
""",
"""
INSERT INTO all_expirations (object_id, block_index, type)
SELECT bet_hash AS object_id, block_index, 'bet' AS type
SELECT hex_lower(bet_hash) AS object_id, block_index, 'bet' AS type
FROM ledger_db.bet_expirations
""",
"""
Expand All @@ -63,7 +92,7 @@ def apply(db):
""",
"""
INSERT INTO all_expirations (object_id, block_index, type)
SELECT rps_hash AS object_id, block_index, 'rps' AS type
SELECT hex_lower(rps_hash) AS object_id, block_index, 'rps' AS type
FROM ledger_db.rps_expirations
""",
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,19 @@
earned_quantity = (
SELECT SUM(earn_quantity)
FROM fairmints
WHERE fairmints.fairminter_tx_hash = fairminters.tx_hash
WHERE fairmints.fairminter_tx_index = fairminters.tx_index
AND fairmints.status = 'valid'
),
paid_quantity = (
SELECT SUM(paid_quantity)
FROM fairmints
WHERE fairmints.fairminter_tx_hash = fairminters.tx_hash
WHERE fairmints.fairminter_tx_index = fairminters.tx_index
AND fairmints.status = 'valid'
),
commission = (
SELECT SUM(commission)
FROM fairmints
WHERE fairmints.fairminter_tx_hash = fairminters.tx_hash
WHERE fairmints.fairminter_tx_index = fairminters.tx_index
AND fairmints.status = 'valid'
);
""",
Expand Down
Loading
Loading