Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions electrum/address_synchronizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ def __init__(self, db: 'WalletDB', config: 'SimpleConfig', *, name: str = None):

self._get_balance_cache = {}
self._get_utxos_cache = {}
self._subscribed_outpoints = set()

self.load_and_cleanup()

Expand Down Expand Up @@ -231,10 +232,20 @@ async def stop(self):
self.network = None

def add_address(self, address: str) -> None:
if address not in self.db.history:
self.db.history[address] = []
if address in self.db.history:
return
self.db.history[address] = []
if self.synchronizer:
self.synchronizer.add(address)
self.synchronizer.add_address(address)
self.up_to_date_changed()

def subscribe_to_outpoint(self, outpoint):
if outpoint in self._subscribed_outpoints:
return
self._subscribed_outpoints.add(outpoint)
if self.synchronizer:
# fixme: launch polling task in synchronizer
self.synchronizer.add_outpoint(outpoint)
self.up_to_date_changed()

@with_lock
Expand Down Expand Up @@ -1064,6 +1075,12 @@ def subscribe_to_outputs(self, spender_txid: str):
if not self.is_mine(o.address):
self.add_address(o.address)

def subscribe_to_tx_outpoints(self, spender_txid: str):
spender_tx = self.get_transaction(spender_txid)
for i, o in enumerate(spender_tx.outputs()):
txo = spender_txid + ':%d'%i
self.subscribe_to_outpoint(txo)

def get_tx_mined_depth(self, txid: str):
if not txid:
return TxMinedDepth.FREE
Expand Down
3 changes: 1 addition & 2 deletions electrum/bip39_recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ async def account_has_history(network: 'Network', account_node: BIP32Node, scrip
pubkey = address_node.eckey.get_public_key_hex()
address = bitcoin.pubkey_to_address(script_type, pubkey)
script = bitcoin.address_to_script(address)
scripthash = bitcoin.script_to_scripthash(script)
get_history = network.get_history_for_scripthash(scripthash)
get_history = network.get_history_for_spk(script.hex())
get_history_tasks.append(await group.spawn(get_history))
for task in get_history_tasks:
history = task.result()
Expand Down
1 change: 1 addition & 0 deletions electrum/bitcoin.py
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,7 @@ def address_to_scripthash(addr: str, *, net=None) -> str:


def script_to_scripthash(script: bytes) -> str:
assert type(script) is bytes
h = sha256(script)
return h[::-1].hex()

Expand Down
12 changes: 6 additions & 6 deletions electrum/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,8 +477,8 @@ async def getaddresshistory(self, address):

arg:str:address:Bitcoin address
"""
sh = bitcoin.address_to_scripthash(address)
return await self.network.get_history_for_scripthash(sh)
spk = bitcoin.address_to_script(address)
return await self.network.get_history_for_spk(spk.hex())

@command('wp')
async def unlock(self, wallet: Abstract_Wallet = None, password=None):
Expand All @@ -505,8 +505,8 @@ async def getaddressunspent(self, address):

arg:str:address:Bitcoin address
"""
sh = bitcoin.address_to_scripthash(address)
return await self.network.listunspent_for_scripthash(sh)
spk = bitcoin.address_to_script(address)
return await self.network.listunspent_for_spk(spk.hex())

@command('')
async def serialize(self, jsontx):
Expand Down Expand Up @@ -753,8 +753,8 @@ async def getaddressbalance(self, address):

arg:str:address:Bitcoin address
"""
sh = bitcoin.address_to_scripthash(address)
out = await self.network.get_balance_for_scripthash(sh)
spk = bitcoin.address_to_script(address)
out = await self.network.get_balance_for_spk(spk.hex())
out["confirmed"] = format_satoshis(out["confirmed"])
out["unconfirmed"] = format_satoshis(out["unconfirmed"])
return out
Expand Down
43 changes: 24 additions & 19 deletions electrum/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
from . import blockchain
from .blockchain import Blockchain, HEADER_SIZE, CHUNK_SIZE
from . import bitcoin
from .bitcoin import DummyAddress, DummyAddressUsedInTxException
from .bitcoin import DummyAddress, DummyAddressUsedInTxException, script_to_scripthash
from . import constants
from .i18n import _
from .logging import Logger
Expand All @@ -82,6 +82,11 @@
MAX_NUM_HEADERS_PER_REQUEST = 2016
assert MAX_NUM_HEADERS_PER_REQUEST >= CHUNK_SIZE

RPC_ERROR_HISTORY_TOO_LONG = 10001

class HistoryTooLong(Exception):
# we should not close the connection in that case
pass

class NetworkTimeout:
# seconds
Expand Down Expand Up @@ -208,7 +213,10 @@ async def send_request(self, *args, timeout=None, **kwargs):
raise RequestTimedOut(f'request timed out: {args} (id: {msg_id})') from e
except CodeMessageError as e:
self.maybe_log(f"--> {repr(e)} (id: {msg_id})")
raise
if e.code == RPC_ERROR_HISTORY_TOO_LONG:
raise HistoryTooLong()
else:
raise
except BaseException as e: # cancellations, etc. are useful for debugging
self.maybe_log(f"--> {repr(e)} (id: {msg_id})")
raise
Expand All @@ -225,14 +233,17 @@ def set_default_timeout(self, timeout):
async def subscribe(self, method: str, params: List, queue: asyncio.Queue):
# note: until the cache is written for the first time,
# each 'subscribe' call might make a request on the network.
key = self.get_hashable_key_for_rpc_call(method, params)
params2 = params[:]
if method == 'blockchain.scriptpubkey.subscribe':
params2[0] = script_to_scripthash(bytes.fromhex(params2[0]))
key = self.get_hashable_key_for_rpc_call(method, params2)
self.subscriptions[key].append(queue)
if key in self.cache:
result = self.cache[key]
else:
result = await self.send_request(method, params)
self.cache[key] = result
await queue.put(params + [result])
await queue.put(params2 + [result])

def unsubscribe(self, queue):
"""Unsubscribe a callback to free object references to enable GC."""
Expand Down Expand Up @@ -1417,7 +1428,7 @@ async def broadcast_transaction(self, tx: 'Transaction', *, timeout=None) -> Non
raise TxBroadcastHashMismatch(_("Server returned unexpected transaction ID."))
# broadcast succeeded.
# We now cache the rawtx, for *this interface only*. The tx likely touches some ismine addresses, affecting
# the status of a scripthash we are subscribed to. Caching here will save a future get_transaction RPC.
# the status of a scriptpubkey we are subscribed to. Caching here will save a future get_transaction RPC.
self._rawtx_cache[txid_calc] = bytes.fromhex(rawtx)

async def broadcast_txpackage(self, txs: Sequence['Transaction']) -> bool:
Expand All @@ -1442,16 +1453,14 @@ async def broadcast_txpackage(self, txs: Sequence['Transaction']) -> bool:
assert success
# broadcast succeeded.
# We now cache the rawtx, for *this interface only*. The tx likely touches some ismine addresses, affecting
# the status of a scripthash we are subscribed to. Caching here will save a future get_transaction RPC.
# the status of a scriptpubkey we are subscribed to. Caching here will save a future get_transaction RPC.
for tx, rawtx in zip(txs, rawtxs):
self._rawtx_cache[tx.txid()] = bytes.fromhex(rawtx)
return True

async def get_history_for_scripthash(self, sh: str) -> List[dict]:
if not is_hash256_str(sh):
raise Exception(f"{repr(sh)} is not a scripthash")
async def get_history_for_spk(self, spk: str) -> List[dict]:
# do request
res = await self.session.send_request('blockchain.scripthash.get_history', [sh])
res = await self.session.send_request('blockchain.scriptpubkey.get_history', [spk])
# check response
assert_list_or_tuple(res)
prev_height = 1
Expand Down Expand Up @@ -1481,14 +1490,12 @@ async def get_history_for_scripthash(self, sh: str) -> List[dict]:
# Either server is sending garbage... or maybe if server is race-prone
# a recently mined tx could be included in both last block and mempool?
# Still, it's simplest to just disregard the response.
raise RequestCorrupted(f"server history has non-unique txids for sh={sh}")
raise RequestCorrupted(f"server history has non-unique txids for spk={spk}")
return res

async def listunspent_for_scripthash(self, sh: str) -> List[dict]:
if not is_hash256_str(sh):
raise Exception(f"{repr(sh)} is not a scripthash")
async def listunspent_for_spk(self, spk: str) -> List[dict]:
# do request
res = await self.session.send_request('blockchain.scripthash.listunspent', [sh])
res = await self.session.send_request('blockchain.scriptpubkey.listunspent', [spk])
# check response
assert_list_or_tuple(res)
for utxo_item in res:
Expand All @@ -1502,11 +1509,9 @@ async def listunspent_for_scripthash(self, sh: str) -> List[dict]:
assert_hash256_str(utxo_item['tx_hash'])
return res

async def get_balance_for_scripthash(self, sh: str) -> dict:
if not is_hash256_str(sh):
raise Exception(f"{repr(sh)} is not a scripthash")
async def get_balance_for_spk(self, spk: str) -> dict:
# do request
res = await self.session.send_request('blockchain.scripthash.get_balance', [sh])
res = await self.session.send_request('blockchain.scriptpubkey.get_balance', [spk])
# check response
assert_dict_contains_field(res, field_name='confirmed')
assert_dict_contains_field(res, field_name='unconfirmed')
Expand Down
6 changes: 3 additions & 3 deletions electrum/lnpeer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1631,10 +1631,10 @@ async def reestablish_channel(self, chan: Channel):
# We should let them know:
self._send_channel_reestablish(chan)
return
adb = self.lnworker.wallet.adb
if self.network.blockchain().is_tip_stale() \
or not self.lnworker.wallet.is_up_to_date() \
or self.lnworker.current_target_feerate_per_kw(has_anchors=chan.has_anchors()) \
is None:
or (adb.synchronizer and not adb.synchronizer.outpoints_up_to_date()) \
or self.lnworker.current_target_feerate_per_kw(has_anchors=chan.has_anchors()) is None:
# don't try to reestablish until we can do fee estimation and are up-to-date
return
# if we get here, we will try to do a proper reestablish
Expand Down
40 changes: 26 additions & 14 deletions electrum/lnwatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ def start_network(self, network: 'Network'):
def stop(self):
self.unregister_callbacks()

def remove_callback(self, address: str) -> None:
self.callbacks.pop(address, None)
def remove_callback(self, outpoint: str) -> None:
self.callbacks.pop(outpoint, None)

def add_callback(
def add_address_callback(
self,
address: str,
callback: Callable[[], Awaitable[None]],
Expand All @@ -57,6 +57,17 @@ def add_callback(
self.adb.add_address(address)
self.callbacks[address] = callback

def add_outpoint_callback(
self,
outpoint: str,
callback: Callable[[], Awaitable[None]],
*,
subscribe: bool = True,
) -> None:
if subscribe:
self.adb.subscribe_to_outpoint(outpoint)
self.callbacks[outpoint] = callback

async def trigger_callbacks(self, *, requires_synchronizer: bool = True):
if requires_synchronizer and not self.adb.synchronizer:
self.logger.info("synchronizer not set yet")
Expand Down Expand Up @@ -90,33 +101,32 @@ async def on_event_adb_added_verified_tx(self, adb, tx_hash):
async def on_event_adb_set_up_to_date(self, adb):
if adb != self.adb:
return
# fixme: use separate events for addresses and outpoints
await self.trigger_callbacks()

def add_channel(self, chan: 'AbstractChannel') -> None:
outpoint = chan.funding_outpoint.to_str()
address = chan.get_funding_address()
callback = lambda: self.check_onchain_situation(address, outpoint)
self.add_callback(address, callback, subscribe=chan.need_to_subscribe())
callback = lambda: self.check_onchain_situation(outpoint)
self.add_outpoint_callback(outpoint, callback, subscribe=chan.need_to_subscribe())

@ignore_exceptions
@log_exceptions
async def check_onchain_situation(self, address: str, funding_outpoint: str) -> None:
# early return if address has not been added yet
if not self.adb.is_mine(address):
async def check_onchain_situation(self, funding_outpoint: str) -> None:
# early return if funding_outpoint has not been added yet
if funding_outpoint not in self.adb._subscribed_outpoints:
self.logger.info(f"funding outpoint not subscribed yet")
return
# inspect_tx_candidate might have added new addresses, in which case we return early
# note: maybe we should wait until adb.is_up_to_date... (?)
funding_txid = funding_outpoint.split(':')[0]
funding_height = self.adb.get_tx_height(funding_txid)
closing_txid = self.adb.get_spender(funding_outpoint)
closing_height = self.adb.get_tx_height(closing_txid)
if closing_txid:
self.adb.subscribe_to_outputs(closing_txid)
self.adb.subscribe_to_tx_outpoints(closing_txid)
closing_tx = self.adb.get_transaction(closing_txid)
if closing_tx:
keep_watching = await self.sweep_commitment_transaction(funding_outpoint, closing_tx)
if not keep_watching:
self.remove_callback(address)
self.remove_callback(funding_outpoint)
else:
self.logger.info(f"channel {funding_outpoint} closed by {closing_txid}. still waiting for tx itself...")
keep_watching = True
Expand Down Expand Up @@ -191,7 +201,7 @@ async def sweep_commitment_transaction(self, funding_outpoint: str, closing_tx:
# the spender might be the remote, revoked or not
htlc_sweepinfo = chan.maybe_sweep_htlcs(closing_tx, spender_tx)
if htlc_sweepinfo:
self.adb.subscribe_to_outputs(spender_txid)
self.adb.subscribe_to_tx_outpoints(spender_txid)
for prevout2, htlc_sweep_info in htlc_sweepinfo.items():
self.lnworker.wallet.set_default_label(prevout2, htlc_sweep_info.name)
if isinstance(htlc_sweep_info, KeepWatchingTXO): # haven't yet decided if we want to sweep
Expand Down Expand Up @@ -276,6 +286,8 @@ def maybe_add_accounting_address(self, spender_txid: str, sweep_info: 'SweepInfo
prev_tx = self.adb.get_transaction(prev_txid)
txout = prev_tx.outputs()[int(prev_index)]
self.lnworker.wallet._accounting_addresses.add(txout.address)
# fixme: replace accounting_addresses with accounting_outpoints
self.adb.add_address(txout.address)

def maybe_add_pending_forceclose(
self,
Expand Down
12 changes: 6 additions & 6 deletions electrum/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -1115,24 +1115,24 @@ async def get_transaction(self, tx_hash: str, *, timeout=None) -> str:

@best_effort_reliable
@catch_server_exceptions
async def get_history_for_scripthash(self, sh: str) -> List[dict]:
async def get_history_for_spk(self, spk: str) -> List[dict]:
if self.interface is None: # handled by best_effort_reliable
raise RequestTimedOut()
return await self.interface.get_history_for_scripthash(sh)
return await self.interface.get_history_for_spk(spk)

@best_effort_reliable
@catch_server_exceptions
async def listunspent_for_scripthash(self, sh: str) -> List[dict]:
async def listunspent_for_spk(self, spk: str) -> List[dict]:
if self.interface is None: # handled by best_effort_reliable
raise RequestTimedOut()
return await self.interface.listunspent_for_scripthash(sh)
return await self.interface.listunspent_for_spk(spk)

@best_effort_reliable
@catch_server_exceptions
async def get_balance_for_scripthash(self, sh: str) -> dict:
async def get_balance_for_spk(self, spk: str) -> dict:
if self.interface is None: # handled by best_effort_reliable
raise RequestTimedOut()
return await self.interface.get_balance_for_scripthash(sh)
return await self.interface.get_balance_for_spk(spk)

@best_effort_reliable
@catch_server_exceptions
Expand Down
2 changes: 1 addition & 1 deletion electrum/submarine_swaps.py
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ def get_swap(self, payment_hash: bytes) -> Optional[SwapData]:

def add_lnwatcher_callback(self, swap: SwapData) -> None:
callback = lambda: self._claim_swap(swap)
self.lnwatcher.add_callback(swap.lockup_address, callback)
self.lnwatcher.add_address_callback(swap.lockup_address, callback)

async def hold_invoice_callback(self, payment_hash: bytes) -> None:
# note: this assumes the wallet has been unlocked
Expand Down
Loading