diff --git a/electrum/address_synchronizer.py b/electrum/address_synchronizer.py index 3e9502e3048..fc5824ce1bc 100644 --- a/electrum/address_synchronizer.py +++ b/electrum/address_synchronizer.py @@ -102,6 +102,7 @@ def __init__(self, db: 'WalletDB', config: 'SimpleConfig', *, name: str = None): self._get_balance_cache = {} self._get_utxos_cache = {} + self._subscribed_outpoints = set() self.load_and_cleanup() @@ -231,10 +232,20 @@ async def stop(self): self.network = None def add_address(self, address: str) -> None: - if address not in self.db.history: - self.db.history[address] = [] + if address in self.db.history: + return + self.db.history[address] = [] if self.synchronizer: - self.synchronizer.add(address) + self.synchronizer.add_address(address) + self.up_to_date_changed() + + def subscribe_to_outpoint(self, outpoint): + if outpoint in self._subscribed_outpoints: + return + self._subscribed_outpoints.add(outpoint) + if self.synchronizer: + # fixme: launch polling task in synchronizer + self.synchronizer.add_outpoint(outpoint) self.up_to_date_changed() @with_lock @@ -1064,6 +1075,12 @@ def subscribe_to_outputs(self, spender_txid: str): if not self.is_mine(o.address): self.add_address(o.address) + def subscribe_to_tx_outpoints(self, spender_txid: str): + spender_tx = self.get_transaction(spender_txid) + for i, o in enumerate(spender_tx.outputs()): + txo = spender_txid + ':%d'%i + self.subscribe_to_outpoint(txo) + def get_tx_mined_depth(self, txid: str): if not txid: return TxMinedDepth.FREE diff --git a/electrum/bip39_recovery.py b/electrum/bip39_recovery.py index 3b857b3ebc1..141a43b10aa 100644 --- a/electrum/bip39_recovery.py +++ b/electrum/bip39_recovery.py @@ -59,8 +59,7 @@ async def account_has_history(network: 'Network', account_node: BIP32Node, scrip pubkey = address_node.eckey.get_public_key_hex() address = bitcoin.pubkey_to_address(script_type, pubkey) script = bitcoin.address_to_script(address) - scripthash = bitcoin.script_to_scripthash(script) - get_history = network.get_history_for_scripthash(scripthash) + get_history = network.get_history_for_spk(script.hex()) get_history_tasks.append(await group.spawn(get_history)) for task in get_history_tasks: history = task.result() diff --git a/electrum/bitcoin.py b/electrum/bitcoin.py index 7afde6e54c7..3bc6ab997a5 100644 --- a/electrum/bitcoin.py +++ b/electrum/bitcoin.py @@ -512,6 +512,7 @@ def address_to_scripthash(addr: str, *, net=None) -> str: def script_to_scripthash(script: bytes) -> str: + assert type(script) is bytes h = sha256(script) return h[::-1].hex() diff --git a/electrum/commands.py b/electrum/commands.py index ac3143ed47f..5a11ef79a1e 100644 --- a/electrum/commands.py +++ b/electrum/commands.py @@ -477,8 +477,8 @@ async def getaddresshistory(self, address): arg:str:address:Bitcoin address """ - sh = bitcoin.address_to_scripthash(address) - return await self.network.get_history_for_scripthash(sh) + spk = bitcoin.address_to_script(address) + return await self.network.get_history_for_spk(spk.hex()) @command('wp') async def unlock(self, wallet: Abstract_Wallet = None, password=None): @@ -505,8 +505,8 @@ async def getaddressunspent(self, address): arg:str:address:Bitcoin address """ - sh = bitcoin.address_to_scripthash(address) - return await self.network.listunspent_for_scripthash(sh) + spk = bitcoin.address_to_script(address) + return await self.network.listunspent_for_spk(spk.hex()) @command('') async def serialize(self, jsontx): @@ -753,8 +753,8 @@ async def getaddressbalance(self, address): arg:str:address:Bitcoin address """ - sh = bitcoin.address_to_scripthash(address) - out = await self.network.get_balance_for_scripthash(sh) + spk = bitcoin.address_to_script(address) + out = await self.network.get_balance_for_spk(spk.hex()) out["confirmed"] = format_satoshis(out["confirmed"]) out["unconfirmed"] = format_satoshis(out["unconfirmed"]) return out diff --git a/electrum/interface.py b/electrum/interface.py index 918ead8deff..2ef37d5aa7b 100644 --- a/electrum/interface.py +++ b/electrum/interface.py @@ -58,7 +58,7 @@ from . import blockchain from .blockchain import Blockchain, HEADER_SIZE, CHUNK_SIZE from . import bitcoin -from .bitcoin import DummyAddress, DummyAddressUsedInTxException +from .bitcoin import DummyAddress, DummyAddressUsedInTxException, script_to_scripthash from . import constants from .i18n import _ from .logging import Logger @@ -82,6 +82,11 @@ MAX_NUM_HEADERS_PER_REQUEST = 2016 assert MAX_NUM_HEADERS_PER_REQUEST >= CHUNK_SIZE +RPC_ERROR_HISTORY_TOO_LONG = 10001 + +class HistoryTooLong(Exception): + # we should not close the connection in that case + pass class NetworkTimeout: # seconds @@ -208,7 +213,10 @@ async def send_request(self, *args, timeout=None, **kwargs): raise RequestTimedOut(f'request timed out: {args} (id: {msg_id})') from e except CodeMessageError as e: self.maybe_log(f"--> {repr(e)} (id: {msg_id})") - raise + if e.code == RPC_ERROR_HISTORY_TOO_LONG: + raise HistoryTooLong() + else: + raise except BaseException as e: # cancellations, etc. are useful for debugging self.maybe_log(f"--> {repr(e)} (id: {msg_id})") raise @@ -225,14 +233,17 @@ def set_default_timeout(self, timeout): async def subscribe(self, method: str, params: List, queue: asyncio.Queue): # note: until the cache is written for the first time, # each 'subscribe' call might make a request on the network. - key = self.get_hashable_key_for_rpc_call(method, params) + params2 = params[:] + if method == 'blockchain.scriptpubkey.subscribe': + params2[0] = script_to_scripthash(bytes.fromhex(params2[0])) + key = self.get_hashable_key_for_rpc_call(method, params2) self.subscriptions[key].append(queue) if key in self.cache: result = self.cache[key] else: result = await self.send_request(method, params) self.cache[key] = result - await queue.put(params + [result]) + await queue.put(params2 + [result]) def unsubscribe(self, queue): """Unsubscribe a callback to free object references to enable GC.""" @@ -1417,7 +1428,7 @@ async def broadcast_transaction(self, tx: 'Transaction', *, timeout=None) -> Non raise TxBroadcastHashMismatch(_("Server returned unexpected transaction ID.")) # broadcast succeeded. # We now cache the rawtx, for *this interface only*. The tx likely touches some ismine addresses, affecting - # the status of a scripthash we are subscribed to. Caching here will save a future get_transaction RPC. + # the status of a scriptpubkey we are subscribed to. Caching here will save a future get_transaction RPC. self._rawtx_cache[txid_calc] = bytes.fromhex(rawtx) async def broadcast_txpackage(self, txs: Sequence['Transaction']) -> bool: @@ -1442,16 +1453,14 @@ async def broadcast_txpackage(self, txs: Sequence['Transaction']) -> bool: assert success # broadcast succeeded. # We now cache the rawtx, for *this interface only*. The tx likely touches some ismine addresses, affecting - # the status of a scripthash we are subscribed to. Caching here will save a future get_transaction RPC. + # the status of a scriptpubkey we are subscribed to. Caching here will save a future get_transaction RPC. for tx, rawtx in zip(txs, rawtxs): self._rawtx_cache[tx.txid()] = bytes.fromhex(rawtx) return True - async def get_history_for_scripthash(self, sh: str) -> List[dict]: - if not is_hash256_str(sh): - raise Exception(f"{repr(sh)} is not a scripthash") + async def get_history_for_spk(self, spk: str) -> List[dict]: # do request - res = await self.session.send_request('blockchain.scripthash.get_history', [sh]) + res = await self.session.send_request('blockchain.scriptpubkey.get_history', [spk]) # check response assert_list_or_tuple(res) prev_height = 1 @@ -1481,14 +1490,12 @@ async def get_history_for_scripthash(self, sh: str) -> List[dict]: # Either server is sending garbage... or maybe if server is race-prone # a recently mined tx could be included in both last block and mempool? # Still, it's simplest to just disregard the response. - raise RequestCorrupted(f"server history has non-unique txids for sh={sh}") + raise RequestCorrupted(f"server history has non-unique txids for spk={spk}") return res - async def listunspent_for_scripthash(self, sh: str) -> List[dict]: - if not is_hash256_str(sh): - raise Exception(f"{repr(sh)} is not a scripthash") + async def listunspent_for_spk(self, spk: str) -> List[dict]: # do request - res = await self.session.send_request('blockchain.scripthash.listunspent', [sh]) + res = await self.session.send_request('blockchain.scriptpubkey.listunspent', [spk]) # check response assert_list_or_tuple(res) for utxo_item in res: @@ -1502,11 +1509,9 @@ async def listunspent_for_scripthash(self, sh: str) -> List[dict]: assert_hash256_str(utxo_item['tx_hash']) return res - async def get_balance_for_scripthash(self, sh: str) -> dict: - if not is_hash256_str(sh): - raise Exception(f"{repr(sh)} is not a scripthash") + async def get_balance_for_spk(self, spk: str) -> dict: # do request - res = await self.session.send_request('blockchain.scripthash.get_balance', [sh]) + res = await self.session.send_request('blockchain.scriptpubkey.get_balance', [spk]) # check response assert_dict_contains_field(res, field_name='confirmed') assert_dict_contains_field(res, field_name='unconfirmed') diff --git a/electrum/lnpeer.py b/electrum/lnpeer.py index 6e624556891..1a8ea63cd77 100644 --- a/electrum/lnpeer.py +++ b/electrum/lnpeer.py @@ -1631,10 +1631,10 @@ async def reestablish_channel(self, chan: Channel): # We should let them know: self._send_channel_reestablish(chan) return + adb = self.lnworker.wallet.adb if self.network.blockchain().is_tip_stale() \ - or not self.lnworker.wallet.is_up_to_date() \ - or self.lnworker.current_target_feerate_per_kw(has_anchors=chan.has_anchors()) \ - is None: + or (adb.synchronizer and not adb.synchronizer.outpoints_up_to_date()) \ + or self.lnworker.current_target_feerate_per_kw(has_anchors=chan.has_anchors()) is None: # don't try to reestablish until we can do fee estimation and are up-to-date return # if we get here, we will try to do a proper reestablish diff --git a/electrum/lnwatcher.py b/electrum/lnwatcher.py index 2a9d0faf705..3db62c7e4a2 100644 --- a/electrum/lnwatcher.py +++ b/electrum/lnwatcher.py @@ -37,10 +37,10 @@ def start_network(self, network: 'Network'): def stop(self): self.unregister_callbacks() - def remove_callback(self, address: str) -> None: - self.callbacks.pop(address, None) + def remove_callback(self, outpoint: str) -> None: + self.callbacks.pop(outpoint, None) - def add_callback( + def add_address_callback( self, address: str, callback: Callable[[], Awaitable[None]], @@ -57,6 +57,17 @@ def add_callback( self.adb.add_address(address) self.callbacks[address] = callback + def add_outpoint_callback( + self, + outpoint: str, + callback: Callable[[], Awaitable[None]], + *, + subscribe: bool = True, + ) -> None: + if subscribe: + self.adb.subscribe_to_outpoint(outpoint) + self.callbacks[outpoint] = callback + async def trigger_callbacks(self, *, requires_synchronizer: bool = True): if requires_synchronizer and not self.adb.synchronizer: self.logger.info("synchronizer not set yet") @@ -90,33 +101,32 @@ async def on_event_adb_added_verified_tx(self, adb, tx_hash): async def on_event_adb_set_up_to_date(self, adb): if adb != self.adb: return + # fixme: use separate events for addresses and outpoints await self.trigger_callbacks() def add_channel(self, chan: 'AbstractChannel') -> None: outpoint = chan.funding_outpoint.to_str() - address = chan.get_funding_address() - callback = lambda: self.check_onchain_situation(address, outpoint) - self.add_callback(address, callback, subscribe=chan.need_to_subscribe()) + callback = lambda: self.check_onchain_situation(outpoint) + self.add_outpoint_callback(outpoint, callback, subscribe=chan.need_to_subscribe()) @ignore_exceptions @log_exceptions - async def check_onchain_situation(self, address: str, funding_outpoint: str) -> None: - # early return if address has not been added yet - if not self.adb.is_mine(address): + async def check_onchain_situation(self, funding_outpoint: str) -> None: + # early return if funding_outpoint has not been added yet + if funding_outpoint not in self.adb._subscribed_outpoints: + self.logger.info(f"funding outpoint not subscribed yet") return - # inspect_tx_candidate might have added new addresses, in which case we return early - # note: maybe we should wait until adb.is_up_to_date... (?) funding_txid = funding_outpoint.split(':')[0] funding_height = self.adb.get_tx_height(funding_txid) closing_txid = self.adb.get_spender(funding_outpoint) closing_height = self.adb.get_tx_height(closing_txid) if closing_txid: - self.adb.subscribe_to_outputs(closing_txid) + self.adb.subscribe_to_tx_outpoints(closing_txid) closing_tx = self.adb.get_transaction(closing_txid) if closing_tx: keep_watching = await self.sweep_commitment_transaction(funding_outpoint, closing_tx) if not keep_watching: - self.remove_callback(address) + self.remove_callback(funding_outpoint) else: self.logger.info(f"channel {funding_outpoint} closed by {closing_txid}. still waiting for tx itself...") keep_watching = True @@ -191,7 +201,7 @@ async def sweep_commitment_transaction(self, funding_outpoint: str, closing_tx: # the spender might be the remote, revoked or not htlc_sweepinfo = chan.maybe_sweep_htlcs(closing_tx, spender_tx) if htlc_sweepinfo: - self.adb.subscribe_to_outputs(spender_txid) + self.adb.subscribe_to_tx_outpoints(spender_txid) for prevout2, htlc_sweep_info in htlc_sweepinfo.items(): self.lnworker.wallet.set_default_label(prevout2, htlc_sweep_info.name) if isinstance(htlc_sweep_info, KeepWatchingTXO): # haven't yet decided if we want to sweep @@ -276,6 +286,8 @@ def maybe_add_accounting_address(self, spender_txid: str, sweep_info: 'SweepInfo prev_tx = self.adb.get_transaction(prev_txid) txout = prev_tx.outputs()[int(prev_index)] self.lnworker.wallet._accounting_addresses.add(txout.address) + # fixme: replace accounting_addresses with accounting_outpoints + self.adb.add_address(txout.address) def maybe_add_pending_forceclose( self, diff --git a/electrum/network.py b/electrum/network.py index c74a0af3675..30400b2fdc1 100644 --- a/electrum/network.py +++ b/electrum/network.py @@ -1115,24 +1115,24 @@ async def get_transaction(self, tx_hash: str, *, timeout=None) -> str: @best_effort_reliable @catch_server_exceptions - async def get_history_for_scripthash(self, sh: str) -> List[dict]: + async def get_history_for_spk(self, spk: str) -> List[dict]: if self.interface is None: # handled by best_effort_reliable raise RequestTimedOut() - return await self.interface.get_history_for_scripthash(sh) + return await self.interface.get_history_for_spk(spk) @best_effort_reliable @catch_server_exceptions - async def listunspent_for_scripthash(self, sh: str) -> List[dict]: + async def listunspent_for_spk(self, spk: str) -> List[dict]: if self.interface is None: # handled by best_effort_reliable raise RequestTimedOut() - return await self.interface.listunspent_for_scripthash(sh) + return await self.interface.listunspent_for_spk(spk) @best_effort_reliable @catch_server_exceptions - async def get_balance_for_scripthash(self, sh: str) -> dict: + async def get_balance_for_spk(self, spk: str) -> dict: if self.interface is None: # handled by best_effort_reliable raise RequestTimedOut() - return await self.interface.get_balance_for_scripthash(sh) + return await self.interface.get_balance_for_spk(spk) @best_effort_reliable @catch_server_exceptions diff --git a/electrum/submarine_swaps.py b/electrum/submarine_swaps.py index d5e69fb3c7f..414167d0214 100644 --- a/electrum/submarine_swaps.py +++ b/electrum/submarine_swaps.py @@ -623,7 +623,7 @@ def get_swap(self, payment_hash: bytes) -> Optional[SwapData]: def add_lnwatcher_callback(self, swap: SwapData) -> None: callback = lambda: self._claim_swap(swap) - self.lnwatcher.add_callback(swap.lockup_address, callback) + self.lnwatcher.add_address_callback(swap.lockup_address, callback) async def hold_invoice_callback(self, payment_hash: bytes) -> None: # note: this assumes the wallet has been unlocked diff --git a/electrum/synchronizer.py b/electrum/synchronizer.py index ed7369d19dd..d8be6b23e1f 100644 --- a/electrum/synchronizer.py +++ b/electrum/synchronizer.py @@ -32,10 +32,10 @@ from . import util from .transaction import Transaction, PartialTransaction -from .util import make_aiohttp_session, NetworkJobOnDefaultServer, random_shuffled_copy, OldTaskGroup -from .bitcoin import address_to_scripthash, is_address, neuter_bitcoin_address +from .util import make_aiohttp_session, NetworkJobOnDefaultServer, random_shuffled_copy, OldTaskGroup, log_exceptions +from .bitcoin import address_to_script, script_to_scripthash, is_address, neuter_bitcoin_address from .logging import Logger -from .interface import GracefulDisconnect, NetworkTimeout +from .interface import GracefulDisconnect, NetworkTimeout, HistoryTooLong, assert_hash256_str if TYPE_CHECKING: from .network import Network @@ -66,27 +66,36 @@ def __init__(self, network: 'Network'): def _reset(self): super()._reset() self._adding_addrs = set() + self._adding_outpoints = set() self.requested_addrs = set() + self.requested_outpoints = set() self._handling_addr_statuses = set() + self._handling_outpoint_statuses = set() self.scripthash_to_address = {} self._processed_some_notifications = False # so that we don't miss them # Queues - self.status_queue = asyncio.Queue() + self.address_status_queue = asyncio.Queue() + self.outpoint_status_queue = asyncio.Queue() async def _run_tasks(self, *, taskgroup): await super()._run_tasks(taskgroup=taskgroup) try: async with taskgroup as group: - await group.spawn(self.handle_status()) + await group.spawn(self.handle_address_status()) + await group.spawn(self.handle_outpoint_status()) await group.spawn(self.main()) finally: # we are being cancelled now - self.session.unsubscribe(self.status_queue) + self.session.unsubscribe(self.address_status_queue) + self.session.unsubscribe(self.outpoint_status_queue) - def add(self, addr: str) -> None: + def add_address(self, addr: str) -> None: if not is_address(addr): raise ValueError(f"invalid bitcoin address {neuter_bitcoin_address(addr)}") self._adding_addrs.add(addr) # this lets is_up_to_date already know about addr + def add_outpoint(self, outpoint: str) -> None: + self._adding_outpoints.add(outpoint) # this lets is_up_to_date already know about outpoint + async def _add_address(self, addr: str): try: if not is_address(addr): raise ValueError(f"invalid bitcoin address {neuter_bitcoin_address(addr)}") @@ -96,34 +105,70 @@ async def _add_address(self, addr: str): finally: self._adding_addrs.discard(addr) # ok for addr not to be present + async def _add_outpoint(self, outpoint: str): + try: + if outpoint in self.requested_outpoints: return + self.requested_outpoints.add(outpoint) + await self.taskgroup.spawn(self._subscribe_to_outpoint, outpoint) + finally: + self._adding_outpoints.discard(outpoint) # ok for addr not to be present + async def _on_address_status(self, addr: str, status: Optional[str]): """Handle the change of the status of an address. Should remove addr from self._handling_addr_statuses when done. """ raise NotImplementedError() # implemented by subclasses + async def _on_outpoint_status(self, txid: str, index:int, status: dict): + raise NotImplementedError() # implemented by subclasses + async def _subscribe_to_address(self, addr): - h = address_to_scripthash(addr) + spk = address_to_script(addr) + h = script_to_scripthash(spk) self.scripthash_to_address[h] = addr self._requests_sent += 1 try: async with self._network_request_semaphore: - await self.session.subscribe('blockchain.scripthash.subscribe', [h], self.status_queue) + await self.session.subscribe('blockchain.scriptpubkey.subscribe', [spk.hex()], self.address_status_queue) + except HistoryTooLong: + self.logger.info(f'history too long') + return except RPCError as e: - if e.message == 'history too large': # no unique error code - raise GracefulDisconnect(e, log_level=logging.ERROR) from e raise self._requests_answered += 1 - async def handle_status(self): + async def _subscribe_to_outpoint(self, outpoint): + self._requests_sent += 1 + txhash, idx = outpoint.split(':') + idx = int(idx) + self.logger.info(f'subscribe to outpoint: {txhash}:{idx}') + try: + async with self._network_request_semaphore: + await self.session.subscribe('blockchain.outpoint.subscribe', [txhash, idx], self.outpoint_status_queue) + except RPCError as e: + raise + self._requests_answered += 1 + + @log_exceptions + async def handle_address_status(self): while True: - h, status = await self.status_queue.get() + h, status = await self.address_status_queue.get() addr = self.scripthash_to_address[h] self._handling_addr_statuses.add(addr) self.requested_addrs.discard(addr) # ok for addr not to be present await self.taskgroup.spawn(self._on_address_status, addr, status) self._processed_some_notifications = True + @log_exceptions + async def handle_outpoint_status(self): + while True: + txid, index, status = await self.outpoint_status_queue.get() + outpoint = txid + ':%d'%index + self._handling_outpoint_statuses.add(outpoint) + self.requested_outpoints.discard(outpoint) # ok for addr not to be present + await self.taskgroup.spawn(self._on_outpoint_status, txid, index, status) + self._processed_some_notifications = True + async def main(self): raise NotImplementedError() # implemented by subclasses @@ -151,6 +196,9 @@ def diagnostic_name(self): return self.adb.diagnostic_name() def is_up_to_date(self): + return self.addresses_up_to_date() and self.outpoints_up_to_date() + + def addresses_up_to_date(self): return (self._init_done and not self._adding_addrs and not self.requested_addrs @@ -158,7 +206,15 @@ def is_up_to_date(self): and not self.requested_histories and not self.requested_tx and not self._stale_histories - and self.status_queue.empty()) + and self.address_status_queue.empty()) + + def outpoints_up_to_date(self): + return (self._init_done + and not self._adding_outpoints + and not self.requested_outpoints + and not self._handling_outpoint_statuses + and not self.requested_tx + and self.outpoint_status_queue.empty()) async def _maybe_request_history_for_addr(self, addr: str, *, ann_status: Optional[str]) -> List[dict]: # First opportunistically try to guess the addr history. Might save us network requests. @@ -169,13 +225,18 @@ def guess_height(old_height: int) -> int: return old_height guessed_history = [(txid, guess_height(old_height)) for (txid, old_height) in old_history] if history_status(guessed_history) == ann_status: - self.logger.debug(f"managed to guess new history for {addr}. won't call 'blockchain.scripthash.get_history'.") + self.logger.debug(f"managed to guess new history for {addr}. won't call 'blockchain.scriptpubkey.get_history'.") return [{"height": height, "tx_hash": txid} for (txid, height) in guessed_history] # request addr history from server - sh = address_to_scripthash(addr) + spk = address_to_script(addr) + sh = script_to_scripthash(spk) self._requests_sent += 1 async with self._network_request_semaphore: - result = await self.interface.get_history_for_scripthash(sh) + try: + result = await self.interface.get_history_for_spk(spk.hex()) + except HistoryTooLong: + self.logger.info(f"history too long") + return [] self._requests_answered += 1 self.logger.info(f"receiving history {addr} {len(result)}") return result @@ -215,15 +276,47 @@ async def disconnect_if_still_stale(): # Store received history self.adb.receive_history_callback(addr, hist, tx_fees) # Request transactions we don't have - await self._request_missing_txs(hist) + await self._request_txs_from_history(hist) # Remove request; this allows up_to_date to be True self.requested_histories.discard((addr, status)) - async def _request_missing_txs(self, hist, *, allow_server_not_finding_tx=False): + async def _on_outpoint_status(self, txid: str, index: int, status: dict): + try: + assert isinstance(status, dict) + assert_hash256_str(txid) + txs = set() + txs.add(txid) + height = status.get('height') + if height is not None: + assert isinstance(height, int) + # spv the input + self.adb.add_unverified_or_unconfirmed_tx(txid, height) + # fetch the output + spender_txid = status.get('spender_txhash') + if spender_txid is not None: + assert_hash256_str(spender_txid) + spender_height = status.get('spender_height') + assert isinstance(spender_height, int) + self.adb.add_unverified_or_unconfirmed_tx(spender_txid, spender_height) + txs.add(spender_txid) + await self._request_missing_txs(txs, allow_server_not_finding_tx=False) + finally: + outpoint = txid + ':%d'%index + self._handling_outpoint_statuses.discard(outpoint) + + + async def _request_txs_from_history(self, hist, *, allow_server_not_finding_tx=False): # "hist" is a list of [tx_hash, tx_height] lists - transaction_hashes = [] + txs = set() for tx_hash, _tx_height in hist: + txs.add(tx_hash) + await self._request_missing_txs(txs, allow_server_not_finding_tx=allow_server_not_finding_tx) + + @log_exceptions + async def _request_missing_txs(self, txs, *, allow_server_not_finding_tx=False): + transaction_hashes = [] + for tx_hash in txs: if tx_hash in self.requested_tx: continue tx = self.adb.db.get_transaction(tx_hash) @@ -267,10 +360,13 @@ async def main(self): # Old electrum servers returned ['*'] when all history for the address # was pruned. This no longer happens but may remain in old wallets. if history == ['*']: continue - await self._request_missing_txs(history, allow_server_not_finding_tx=True) + await self._request_txs_from_history(history, allow_server_not_finding_tx=True) # add addresses to bootstrap for addr in random_shuffled_copy(self.adb.get_addresses()): await self._add_address(addr) + # add outpoints to bootstrap (race) + for outpoint in self.adb._subscribed_outpoints: + await self._add_outpoint(outpoint) # main loop self._init_done = True prev_uptodate = False @@ -278,6 +374,8 @@ async def main(self): await asyncio.sleep(0.1) for addr in self._adding_addrs.copy(): # copy set to ensure iterator stability await self._add_address(addr) + for outpoint in list(self._adding_outpoints): + await self._add_outpoint(outpoint) up_to_date = self.adb.is_up_to_date() # see if status changed if (up_to_date != prev_uptodate @@ -311,7 +409,7 @@ async def start_watching_addr(self, addr: str, url: str): async def stop_watching_addr(self, addr: str): self.watched_addresses.pop(addr, None) - # TODO blockchain.scripthash.unsubscribe + # TODO blockchain.scriptpubkey.unsubscribe async def _on_address_status(self, addr, status): if addr not in self.watched_addresses: diff --git a/electrum/version.py b/electrum/version.py index 3b8d9b986ba..fa5d74b85e9 100644 --- a/electrum/version.py +++ b/electrum/version.py @@ -1,7 +1,7 @@ ELECTRUM_VERSION = '4.7.2' # version of the client package -PROTOCOL_VERSION_MIN = '1.4' # electrum protocol -PROTOCOL_VERSION_MAX = '1.6' +PROTOCOL_VERSION_MIN = '1.7' # electrum protocol +PROTOCOL_VERSION_MAX = '1.7' # The hash of the mnemonic seed must begin with this SEED_PREFIX = '01' # Standard wallet diff --git a/electrum/wallet.py b/electrum/wallet.py index 90a004084d0..bb7acb66b6a 100644 --- a/electrum/wallet.py +++ b/electrum/wallet.py @@ -109,14 +109,13 @@ async def _append_utxos_to_inputs( imax: int, ) -> None: script = script_descriptor.expand().output_script - scripthash = bitcoin.script_to_scripthash(script) async def append_single_utxo(item): prev_tx_raw = await network.get_transaction(item['tx_hash']) prev_tx = Transaction(prev_tx_raw) prev_txout = prev_tx.outputs()[item['tx_pos']] - if scripthash != bitcoin.script_to_scripthash(prev_txout.scriptpubkey): - raise Exception('scripthash mismatch when sweeping') + if script != prev_txout.scriptpubkey: + raise Exception('script mismatch when sweeping') prevout_str = item['tx_hash'] + ':%d' % item['tx_pos'] prevout = TxOutpoint.from_str(prevout_str) txin = PartialTxInput(prevout=prevout) @@ -125,7 +124,7 @@ async def append_single_utxo(item): txin.script_descriptor = script_descriptor inputs.append(txin) - u = await network.listunspent_for_scripthash(scripthash) + u = await network.listunspent_for_spk(script.hex()) async with OldTaskGroup() as group: for item in u: if len(inputs) >= imax: diff --git a/tests/test_interface.py b/tests/test_interface.py index f3c7573d89a..3448a2fd182 100644 --- a/tests/test_interface.py +++ b/tests/test_interface.py @@ -94,7 +94,6 @@ def blockchain(self) -> Blockchain: def get_local_height(self) -> int: return self.blockchain().height() - class TestInterface(ElectrumTestCase): REGTEST = True @@ -169,16 +168,16 @@ async def test_transaction_broadcast(self): self.assertEqual(self._get_server_session()._method_counts["blockchain.transaction.get"], 0) async def test_dont_request_gethistory_if_status_change_results_from_mempool_txs_simply_getting_mined(self): - """After a new block is mined, we recv "blockchain.scripthash.subscribe" notifs. + """After a new block is mined, we recv "blockchain.scriptpubkey.subscribe" notifs. We opportunistically guess the scripthash status changed purely because touching mempool txs just got mined. - If the guess is correct, we won't call the "blockchain.scripthash.get_history" RPC. + If the guess is correct, we won't call the "blockchain.scriptpubkey.get_history" RPC. """ interface = await self._start_iface_and_wait_for_sync() server_blockheight = interface.tip w1 = restore_wallet_from_text__for_unittest("9dk", path=None, config=self.config)['wallet'] # type: Abstract_Wallet w1.start_network(self.network) await w1.up_to_date_changed_event.wait() - self.assertEqual(self._get_server_session()._method_counts["blockchain.scripthash.get_history"], 0) + self.assertEqual(self._get_server_session()._method_counts["blockchain.scriptpubkey.get_history"], 0) # fund w1 (in mempool) w1_addr = w1.get_receiving_address() funding_tx = await self._toyserver.ask_faucet([TxOutput.from_address_and_value(w1_addr, 1 * COIN)]) @@ -186,7 +185,7 @@ async def test_dont_request_gethistory_if_status_change_results_from_mempool_txs await w1.up_to_date_changed_event.wait() while not w1.is_up_to_date(): await w1.up_to_date_changed_event.wait() - self.assertEqual(self._get_server_session()._method_counts["blockchain.scripthash.get_history"], 1) + self.assertEqual(self._get_server_session()._method_counts["blockchain.scriptpubkey.get_history"], 1) self.assertEqual( w1.adb.get_address_history(w1_addr), {funding_txid: 0}) @@ -197,7 +196,7 @@ async def test_dont_request_gethistory_if_status_change_results_from_mempool_txs while not w1.is_up_to_date(): await w1.up_to_date_changed_event.wait() # see if we managed to guess new history, and hence did not need to call get_history RPC - self.assertEqual(self._get_server_session()._method_counts["blockchain.scripthash.get_history"], 1) + self.assertEqual(self._get_server_session()._method_counts["blockchain.scriptpubkey.get_history"], 1) self.assertEqual( w1.adb.get_address_history(w1_addr), {funding_txid: server_blockheight}) diff --git a/tests/test_wallet_vertical.py b/tests/test_wallet_vertical.py index c0e26e0ddab..fb2a959b25c 100644 --- a/tests/test_wallet_vertical.py +++ b/tests/test_wallet_vertical.py @@ -2471,7 +2471,8 @@ async def test_cpfp_p2wpkh(self, mock_save_db): async def test_sweep_uncompressed_p2pk(self): class NetworkMock: relay_fee = 1000 - async def listunspent_for_scripthash(self, scripthash): + async def listunspent_for_spk(self, spk): + scripthash = bitcoin.script_to_scripthash(bytes.fromhex(spk)) if scripthash == '460e4fb540b657d775d84ff4955c9b13bd954c2adc26a6b998331343f85b6a45': return [{'tx_hash': 'ac24de8b58e826f60bd7b9ba31670bdfc3e8aedb2f28d0e91599d741569e3429', 'tx_pos': 1, 'height': 1325785, 'value': 1000000}] else: @@ -2496,7 +2497,8 @@ async def get_transaction(self, txid): async def test_sweep_compressed_p2pk(self): class NetworkMock: relay_fee = 1000 - async def listunspent_for_scripthash(self, scripthash): + async def listunspent_for_spk(self, spk): + scripthash = bitcoin.script_to_scripthash(bytes.fromhex(spk)) if scripthash == 'cc911adb9fb939d0003a138ebdaa5195bf1d6f9172e438309ab4c00a5ebc255b': return [{'tx_hash': '84a4a1943f7a620e0d8413f4c10877000768797a93bb106b3e7cd6fccc59b35e', 'tx_pos': 1, 'height': 2420005, 'value': 111111}] else: @@ -2521,7 +2523,8 @@ async def get_transaction(self, txid): async def test_sweep_uncompressed_p2pkh(self): class NetworkMock: relay_fee = 1000 - async def listunspent_for_scripthash(self, scripthash): + async def listunspent_for_spk(self, spk): + scripthash = bitcoin.script_to_scripthash(bytes.fromhex(spk)) if scripthash == '71e8c6a9fd8ab498290d5ccbfe1cfe2c5dc2a389b4c036dd84e305a59c4a4d53': return [{'tx_hash': '15a78cc7664c42f1040474763bf794d555f6092bfba97d6c276f296c2d141506', 'tx_pos': 0, 'height': -1, 'value': 222222}] else: @@ -2546,7 +2549,8 @@ async def get_transaction(self, txid): async def test_sweep_compressed_p2pkh(self): class NetworkMock: relay_fee = 1000 - async def listunspent_for_scripthash(self, scripthash): + async def listunspent_for_spk(self, spk): + scripthash = bitcoin.script_to_scripthash(bytes.fromhex(spk)) if scripthash == '941b2ca8bd850e391abc5e024c83b773842c40268a8fa8a5ef7aeca19fb395c5': return [{'tx_hash': '8a764102b4a5c5d1b5235e6ce7e67ed3c146130f8a52e7692a151e2e5a831767', 'tx_pos': 0, 'height': -1, 'value': 123456}] else: @@ -2571,7 +2575,8 @@ async def get_transaction(self, txid): async def test_sweep_p2wpkh_p2sh(self): class NetworkMock: relay_fee = 1000 - async def listunspent_for_scripthash(self, scripthash): + async def listunspent_for_spk(self, spk): + scripthash = bitcoin.script_to_scripthash(bytes.fromhex(spk)) if scripthash == '9ee9bddbe9dc47f7f6c5a652a09012f49dfc54d5b997f58d7ccc49040871e61b': return [{'tx_hash': '9a7bf98ed72b1002559d3d61805838a00e94afec78b8597a68606e2a0725171d', 'tx_pos': 0, 'height': -1, 'value': 150000}] else: @@ -2596,7 +2601,8 @@ async def get_transaction(self, txid): async def test_sweep_p2wpkh(self): class NetworkMock: relay_fee = 1000 - async def listunspent_for_scripthash(self, scripthash): + async def listunspent_for_spk(self, spk): + scripthash = bitcoin.script_to_scripthash(bytes.fromhex(spk)) if scripthash == '7630f6b2121336279b55e5b71d4a59be5ffa782e86bae249ba0b5ad6a791933f': return [{'tx_hash': '01d76acdb8992f4262fb847f5efbd95ea178049be59c70a2851bdcf9b4ae28e3', 'tx_pos': 0, 'height': 2420006, 'value': 98300}] else: diff --git a/tests/toyserver/test_toyserver.py b/tests/toyserver/test_toyserver.py index 63c469c5793..d2a39a240b1 100644 --- a/tests/toyserver/test_toyserver.py +++ b/tests/toyserver/test_toyserver.py @@ -165,8 +165,8 @@ async def test_mempool_replacement(self): await self.toyserver.mempool_add_tx(tx1d) self.assertEqual(self.toyserver.get_mempool_txids(), set()) - async def test_sort_order_of_scripthash_get_history(self): - """txs touching a sh, as returned by 'blockchain.scripthash.get_history', must be in a canonical order""" + async def test_sort_order_of_spk_get_history(self): + """txs touching a sh, as returned by 'blockchain.scriptpubkey.get_history', must be in a canonical order""" # create a "gateway" wallet with many UTXOs, so later it can send without chaining unconfirmed txs w_gateway = restore_wallet_from_text__for_unittest( "9dk", passphrase="gateway", gap_limit=10, path=None, config=self.config)['wallet'] # type: Abstract_Wallet @@ -236,13 +236,13 @@ async def send_1btc_from_gateway_to_target(addr) -> 'Transaction': await self.toyserver.mempool_add_tx(tx9) self.assertEqual(len(self.toyserver.get_mempool_txids()), 5) - # finally, validate "blockchain.scripthash.get_history" sort order - sh_history = self.toyserver.calc_sh_history(bitcoin.address_to_scripthash(w_addr0)) - self.assertEqual(len(sh_history), 9) + # finally, validate "blockchain.scriptpubkey.get_history" sort order + spk_history = self.toyserver.calc_spk_history(bitcoin.address_to_script(w_addr0).hex()) + self.assertEqual(len(spk_history), 9) tx123_A, tx123_B, tx123_C = sorted([tx1.txid(), tx2.txid(), tx3.txid()], key=lambda x: self.toyserver.block_height_and_pos_from_txid(x)) tx567_A, tx567_B, tx567_C = sorted([tx5.txid(), tx6.txid(), tx7.txid()]) tx89_A, tx89_B = sorted([tx8.txid(), tx9.txid()]) - self.assertEqual(sh_history, [ + self.assertEqual(spk_history, [ (tx123_A, self.toyserver.cur_height - 1), (tx123_B, self.toyserver.cur_height - 1), (tx123_C, self.toyserver.cur_height - 1), diff --git a/tests/toyserver/toyserver.py b/tests/toyserver/toyserver.py index 5e38aea2bf8..4db7aecb70e 100644 --- a/tests/toyserver/toyserver.py +++ b/tests/toyserver/toyserver.py @@ -87,8 +87,8 @@ def __init__(self): self._blocks = [FakeBlock(header=REGTEST_GENESIS_HEADER)] # type: list[FakeBlock] # indexes: - self.sh_to_funding_txids = collections.defaultdict(set) # type: dict[str, set[str]] - self.sh_to_spending_txids = collections.defaultdict(set) # type: dict[str, set[str]] + self.spk_to_funding_txids = collections.defaultdict(set) # type: dict[str, set[str]] + self.spk_to_spending_txids = collections.defaultdict(set) # type: dict[str, set[str]] self.txs = {} # type: dict[str, bytes] # txid->raw_tx self._cache_blockheight_and_pos_from_txid = {} # type: dict[str, tuple[int, int]] self.txo_to_spender_txid = {} # type: dict[TxOutpoint, str | None] # also contains UTXOs @@ -163,8 +163,8 @@ def _has_unconfirmed_inputs(self, txid: str) -> bool: tx = Transaction(self.txs[txid]) return any(self.block_height_from_txid(txin.prevout.txid.hex()) is None for txin in tx.inputs()) - def calc_sh_history(self, sh: str) -> Sequence[tuple[str, int]]: - txids = self.sh_to_funding_txids[sh] | self.sh_to_spending_txids[sh] + def calc_spk_history(self, spk: str) -> Sequence[tuple[str, int]]: + txids = self.spk_to_funding_txids[spk] | self.spk_to_spending_txids[spk] hist1 = [] for txid in txids: bh_and_pos = self.block_height_and_pos_from_txid(txid) @@ -182,31 +182,29 @@ def sort_key(x): hist2 = [(txid, bh) for (txid, (bh, pos)) in hist1] return hist2 - def _get_funded_and_spent_scripthashes(self, tx: Transaction) -> tuple[set[str], set[str]]: - """Returns scripthashes touched by tx.""" + def _get_funded_and_spent_spks(self, tx: Transaction) -> tuple[set[str], set[str]]: + """Returns scriptpubkeys touched by tx.""" txid = tx.txid() assert txid - funded_sh = set() + funded_spks = set() for txout in tx.outputs(): - sh = script_to_scripthash(txout.scriptpubkey) - funded_sh.add(sh) - spent_sh = set() + funded_spks.add(txout.scriptpubkey.hex()) + spent_spks = set() for txin in tx.inputs(): if txin.is_coinbase_input(): continue parent_tx_raw = self.txs[txin.prevout.txid.hex()] # parent must not be missing! parent_tx = Transaction(parent_tx_raw) ptxout = parent_tx.outputs()[txin.prevout.out_idx] - sh = script_to_scripthash(ptxout.scriptpubkey) - spent_sh.add(sh) - return funded_sh, spent_sh + spent_spks.add(ptxout.scriptpubkey.hex()) + return funded_spks, spent_spks def _add_tx(self, tx: Transaction) -> set[str]: txid = tx.txid() assert txid if txid in self.txs: # already added - funded_sh, spent_sh = self._get_funded_and_spent_scripthashes(tx) - return funded_sh | spent_sh + funded_spks, spent_spks = self._get_funded_and_spent_spks(tx) + return funded_spks | spent_spks # we forbid conflicting txs. for mempool replacement, the caller must already have rm-ed the conflicts. conflict_txids = self._get_transitive_conflict_txids(tx) assert not conflict_txids, "tx conflict" @@ -230,13 +228,13 @@ def _add_tx(self, tx: Transaction) -> set[str]: raise Exception("TXO already marked as spent by same txid?") else: raise Exception(f"cannot double-spend UTXO: {txin.prevout}. conflict: {txid} vs {double_spender_txid}") - # update touched scripthashes - funded_sh, spent_sh = self._get_funded_and_spent_scripthashes(tx) - for sh in funded_sh: - self.sh_to_funding_txids[sh].add(txid) - for sh in spent_sh: - self.sh_to_spending_txids[sh].add(txid) - return funded_sh | spent_sh + # update touched scriptpubkeys + funded_spks, spent_spks = self._get_funded_and_spent_spks(tx) + for spk in funded_spks: + self.spk_to_funding_txids[spk].add(txid) + for spk in spent_spks: + self.spk_to_spending_txids[spk].add(txid) + return funded_spks | spent_spks def _remove_tx_that_has_no_children(self, tx: Transaction) -> set[str]: txid = tx.txid() @@ -255,24 +253,24 @@ def _remove_tx_that_has_no_children(self, tx: Transaction) -> set[str]: continue assert self.txo_to_spender_txid[txin.prevout] == txid self.txo_to_spender_txid[txin.prevout] = None - # update touched scripthashes - funded_sh, spent_sh = self._get_funded_and_spent_scripthashes(tx) - for sh in funded_sh: - self.sh_to_funding_txids[sh].discard(txid) - for sh in spent_sh: - self.sh_to_spending_txids[sh].discard(txid) - return funded_sh | spent_sh + # update touched spks + funded_spks, spent_spks = self._get_funded_and_spent_spks(tx) + for spk in funded_spks: + self.spk_to_funding_txids[spk].discard(txid) + for spk in spent_spks: + self.spk_to_spending_txids[spk].discard(txid) + return funded_spks | spent_spks def _remove_tx_and_all_children(self, tx: Transaction) -> set[str]: txid = tx.txid() assert txid assert txid in self.txs, "unknown tx" children = self._get_transitive_children_txids(txid) - touched_sh = set() + touched_spks = set() for txid in children: tx = Transaction(self.txs[txid]) - touched_sh |= self._remove_tx_that_has_no_children(tx) - return touched_sh + touched_spks |= self._remove_tx_that_has_no_children(tx) + return touched_spks def _get_direct_children_txids(self, txid: str) -> Sequence[str]: res = [] @@ -320,7 +318,7 @@ def _get_fee_sat_paid_by_tx(self, tx: Transaction) -> int: return input_sum - tx.output_value() async def mempool_add_tx(self, newtx: Transaction) -> None: - touched_sh = set() + touched_spks = set() conflict_txids = self._get_transitive_conflict_txids(newtx, include_self=False) conflict_txs = self._txs_from_txids(conflict_txids) if conflict_txids: @@ -340,22 +338,22 @@ async def mempool_add_tx(self, newtx: Transaction) -> None: # rm conflicts for tx in conflict_txs: if tx.txid() in self.txs: # might already be removed in an earlier loop iter - touched_sh = self._remove_tx_and_all_children(tx) + touched_spks = self._remove_tx_and_all_children(tx) # no more conflicts. add new tx. - touched_sh |= self._add_tx(newtx) + touched_spks |= self._add_tx(newtx) # notify clients for session in self.sessions: - await session.server_send_notifications(touched_sh=touched_sh) + await session.server_send_notifications(touched_spks=touched_spks) async def mempool_rm_tx(self, tx: Transaction) -> None: txid = tx.txid() assert txid assert txid in self.txs, "unknown tx" assert self.block_height_from_txid(txid) is None, "tx already mined" - touched_sh = self._remove_tx_and_all_children(tx) + touched_spks = self._remove_tx_and_all_children(tx) # notify clients for session in self.sessions: - await session.server_send_notifications(touched_sh=touched_sh) + await session.server_send_notifications(touched_spks=touched_spks) async def mine_block( self, @@ -392,12 +390,12 @@ async def mine_block( new_block = FakeBlock(header=new_header, txids=tuple(tx.txid() for tx in txs)) self._blocks.append(new_block) # process txs - touched_sh = set() + touched_spks = set() for tx in txs: - touched_sh |= self._add_tx(tx) + touched_spks |= self._add_tx(tx) # notify clients for session in self.sessions: - await session.server_send_notifications(touched_sh=touched_sh, height_changed=True) + await session.server_send_notifications(touched_spks=touched_spks, height_changed=True) return new_block, coinbase_tx async def unmine_block(self) -> None: @@ -408,14 +406,14 @@ async def unmine_block(self) -> None: # process txs # note: all txs in that block are now automatically considered to be in-mempool. # no need to call _remove_tx -- that would also rm them from the mempool. - touched_sh = set() + touched_spks = set() for txid in block.txids: tx = Transaction(self.txs[txid]) - funded_sh, spent_sh = self._get_funded_and_spent_scripthashes(tx) - touched_sh |= funded_sh | spent_sh + funded_spks, spent_spks = self._get_funded_and_spent_spks(tx) + touched_spks |= funded_spks | spent_spks # notify clients for session in self.sessions: - await session.server_send_notifications(touched_sh=touched_sh, height_changed=True) + await session.server_send_notifications(touched_spks=touched_spks, height_changed=True) async def set_up_faucet(self, *, config: SimpleConfig): assert self._faucet_w is None @@ -454,7 +452,7 @@ def __init__(self, *args, toyserver: ToyServer, **kwargs): self.logger.debug(f'connection from {self.remote_address()}') self.subbed_headers = False self.notified_height = None # type: Optional[int] - self.subbed_scripthashes = set() # type: set[str] + self.subbed_spks = set() # type: set[str] self._method_counts = collections.defaultdict(int) # type: dict[str, int] self.client_name = None self.svr.sessions.add(self) @@ -472,8 +470,8 @@ async def handle_request(self, request): 'blockchain.headers.subscribe': self._handle_headers_subscribe, 'blockchain.block.header': self._handle_block_header, 'blockchain.block.headers': self._handle_block_headers, - 'blockchain.scripthash.subscribe': self._handle_scripthash_subscribe, - 'blockchain.scripthash.get_history': self._handle_scripthash_get_history, + 'blockchain.scriptpubkey.subscribe': self._handle_spk_subscribe, + 'blockchain.scriptpubkey.get_history': self._handle_spk_get_history, 'blockchain.transaction.get': self._handle_transaction_get, 'blockchain.transaction.broadcast': self._handle_transaction_broadcast, 'blockchain.transaction.get_merkle': self._handle_transaction_get_merkle, @@ -487,14 +485,14 @@ async def handle_request(self, request): async def _handle_server_version(self, client_name='', protocol_version=None, *args, **kwargs): self.client_name = client_name - return ['toyserver/0.1', '1.6'] + return ['toyserver/0.1', '1.7'] async def _handle_server_features(self) -> dict: return { 'genesis_hash': constants.net.GENESIS, 'hosts': {"14.3.140.101": {"tcp_port": 51001, "ssl_port": 51002}}, - 'protocol_max': '1.6', - 'protocol_min': '1.6', + 'protocol_max': '1.7', + 'protocol_min': '1.7', 'pruning': None, 'server_version': 'ElectrumX 1.19.0', 'hash_function': 'sha256', @@ -569,27 +567,27 @@ async def _handle_transaction_broadcast(self, raw_tx: str) -> str: raise RPCError(DAEMON_ERROR, str(e)) from e return txid - async def _handle_scripthash_subscribe(self, sh: str) -> Optional[str]: - self.subbed_scripthashes.add(sh) - hist = self.svr.calc_sh_history(sh) + async def _handle_spk_subscribe(self, spk: str) -> Optional[str]: + self.subbed_spks.add(spk) + hist = self.svr.calc_spk_history(spk) return history_status(hist) - async def _handle_scripthash_get_history(self, sh: str) -> Sequence[dict]: - hist_tuples = self.svr.calc_sh_history(sh) + async def _handle_spk_get_history(self, spk: str) -> Sequence[dict]: + hist_tuples = self.svr.calc_spk_history(spk) hist_dicts = [{"height": height, "tx_hash": txid} for (txid, height) in hist_tuples] for hist_dict in hist_dicts: # add "fee" key for mempool txs if hist_dict["height"] in (0, -1,): hist_dict["fee"] = 0 return hist_dicts - async def server_send_notifications(self, *, touched_sh: Iterable[str], height_changed: bool = False) -> None: + async def server_send_notifications(self, *, touched_spks: Iterable[str], height_changed: bool = False) -> None: if height_changed and self.subbed_headers and self.notified_height != self.svr.cur_height: self.notified_height = self.svr.cur_height args = (self._get_headersub_result(),) await self.send_notification('blockchain.headers.subscribe', args) - touched_sh = set(sh for sh in touched_sh if sh in self.subbed_scripthashes) - for sh in touched_sh: - hist = self.svr.calc_sh_history(sh) - args = (sh, history_status(hist)) - await self.send_notification("blockchain.scripthash.subscribe", args) + touched_spks = set(spk for spk in touched_spks if spk in self.subbed_spks) + for spk in touched_spks: + hist = self.svr.calc_spk_history(spk) + args = (script_to_scripthash(bytes.fromhex(spk)), history_status(hist)) + await self.send_notification("blockchain.scriptpubkey.subscribe", args)