diff --git a/src/commonMain/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumClient.kt b/src/commonMain/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumClient.kt index 612d39f1c..e59a5d450 100644 --- a/src/commonMain/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumClient.kt +++ b/src/commonMain/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumClient.kt @@ -1,8 +1,10 @@ package fr.acinq.lightning.blockchain.electrum import fr.acinq.bitcoin.* +import fr.acinq.lightning.blockchain.fee.FeeratePerByte +import fr.acinq.lightning.blockchain.fee.FeeratePerKw +import fr.acinq.lightning.blockchain.fee.OnChainFeerates import fr.acinq.lightning.io.TcpSocket -import fr.acinq.lightning.io.linesFlow import fr.acinq.lightning.io.send import fr.acinq.lightning.utils.* import kotlinx.coroutines.* @@ -23,13 +25,14 @@ sealed interface ElectrumClientCommand { sealed interface ElectrumConnectionStatus { data class Closed(val reason: TcpSocket.IOException?) : ElectrumConnectionStatus object Connecting : ElectrumConnectionStatus - data class Connected(val version: ServerVersionResponse, val height: Int, val header: BlockHeader) : ElectrumConnectionStatus + data class Connected(val version: ServerVersionResponse, val height: Int, val header: BlockHeader, val onchainFeeRates: OnChainFeerates) : ElectrumConnectionStatus } class ElectrumClient( socketBuilder: TcpSocket.Builder?, scope: CoroutineScope, - private val loggerFactory: LoggerFactory + private val loggerFactory: LoggerFactory, + defaultExceptionHandler: CoroutineExceptionHandler? = null ) : CoroutineScope by scope, IElectrumClient { private val logger = loggerFactory.newLogger(this::class) @@ -98,9 +101,11 @@ class ElectrumClient( } } - private fun establishConnection(serverAddress: ServerAddress) = launch(CoroutineExceptionHandler { _, exception -> - logger.error(exception) { "error starting electrum client: " } - }) { + val exceptionHandler = defaultExceptionHandler ?: CoroutineExceptionHandler { _, exception -> + logger.error(exception) { "error starting electrum client" } + } + + private fun establishConnection(serverAddress: ServerAddress) = launch(exceptionHandler) { _connectionStatus.value = ElectrumConnectionStatus.Connecting val socket: TcpSocket = try { val (host, port, tls) = serverAddress @@ -138,22 +143,41 @@ class ElectrumClient( } val flow = socket.linesFlow().map { json.decodeFromString(ElectrumResponseDeserializer, it) } - val version = ServerVersion() - sendRequest(version, 0) val rpcFlow = flow.filterIsInstance>().map { it.value } + var requestId = 0 + + val version = ServerVersion() + sendRequest(version, requestId++) val theirVersion = parseJsonResponse(version, rpcFlow.first()) require(theirVersion is ServerVersionResponse) { "invalid server version response $theirVersion" } logger.info { "server version $theirVersion" } - sendRequest(HeaderSubscription, 0) + + sendRequest(HeaderSubscription, requestId++) val header = parseJsonResponse(HeaderSubscription, rpcFlow.first()) require(header is HeaderSubscriptionResponse) { "invalid header subscription response $header" } + + suspend fun estimateFee(confirmations: Int): EstimateFeeResponse { + val request = EstimateFees(confirmations) + sendRequest(request, requestId++) + val response = parseJsonResponse(request, rpcFlow.first()) + require(response is EstimateFeeResponse) { "invalid estimatefee response $response" } + return response + } + + val fees = listOf(estimateFee(2), estimateFee(6), estimateFee(18), estimateFee(144)) + logger.info { "onchain fees $fees" } + val feeRates = OnChainFeerates( + fundingFeerate = fees[3].feerate ?: FeeratePerKw(FeeratePerByte(2.sat)), + mutualCloseFeerate = fees[2].feerate ?: FeeratePerKw(FeeratePerByte(10.sat)), + claimMainFeerate = fees[1].feerate ?: FeeratePerKw(FeeratePerByte(20.sat)), + fastFeerate = fees[0].feerate ?: FeeratePerKw(FeeratePerByte(50.sat)) + ) _notifications.emit(header) - _connectionStatus.value = ElectrumConnectionStatus.Connected(theirVersion, header.blockHeight, header.header) + _connectionStatus.value = ElectrumConnectionStatus.Connected(theirVersion, header.blockHeight, header.header, feeRates) logger.info { "server tip $header" } // pending requests map val requestMap = mutableMapOf>>() - var requestId = 0 // reset mailbox mailbox.cancel(CancellationException("connection in progress")) diff --git a/src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt b/src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt index f43feb44c..862eb5922 100644 --- a/src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt +++ b/src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt @@ -193,11 +193,11 @@ class Peer( } } launch { - watcher.client.connectionState.filter { it == Connection.ESTABLISHED }.collect { - // onchain fees are retrieved punctually, when electrum status moves to Connection.ESTABLISHED - // since the application is not running most of the time, and when it is, it will be only for a few minutes, this is good enough. - // (for a node that is online most of the time things would be different and we would need to re-evaluate onchain fee estimates on a regular basis) - updateEstimateFees() + watcher.client.connectionStatus.filterIsInstance().collect { + // Onchain fees are retrieved once when we establish a connection to an electrum server. + // It is acceptable since the application will typically not be running more than a few minutes at a time. + // (for a node that is online most of the time things would be different, and we would need to re-evaluate onchain fee estimates on a regular basis) + onChainFeeratesFlow.value = it.onchainFeeRates } } launch { @@ -255,24 +255,6 @@ class Peer( } } - private suspend fun updateEstimateFees() { - watcher.client.connectionState.filter { it == Connection.ESTABLISHED }.first() - val sortedFees = listOf( - watcher.client.estimateFees(2), - watcher.client.estimateFees(6), - watcher.client.estimateFees(18), - watcher.client.estimateFees(144), - ) - logger.info { "on-chain fees: $sortedFees" } - // TODO: If some feerates are null, we may implement a retry - onChainFeeratesFlow.value = OnChainFeerates( - fundingFeerate = sortedFees[3].feerate ?: FeeratePerKw(FeeratePerByte(2.sat)), - mutualCloseFeerate = sortedFees[2].feerate ?: FeeratePerKw(FeeratePerByte(10.sat)), - claimMainFeerate = sortedFees[1].feerate ?: FeeratePerKw(FeeratePerByte(20.sat)), - fastFeerate = sortedFees[0].feerate ?: FeeratePerKw(FeeratePerByte(50.sat)) - ) - } - fun connect() { if (connectionState.value is Connection.CLOSED) establishConnection() else logger.warning { "Peer is already connecting / connected" } diff --git a/src/commonMain/kotlin/fr/acinq/lightning/io/TcpSocket.kt b/src/commonMain/kotlin/fr/acinq/lightning/io/TcpSocket.kt index 6e9f6c4f7..0336b8164 100644 --- a/src/commonMain/kotlin/fr/acinq/lightning/io/TcpSocket.kt +++ b/src/commonMain/kotlin/fr/acinq/lightning/io/TcpSocket.kt @@ -20,6 +20,18 @@ interface TcpSocket { suspend fun receiveFully(buffer: ByteArray, offset: Int, length: Int) suspend fun receiveAvailable(buffer: ByteArray, offset: Int, length: Int): Int + fun linesFlow(): Flow { + return flow { + val buffer = ByteArray(8192) + while (true) { + val size = receiveAvailable(buffer) + emit(buffer.subArray(size)) + } + } + .decodeToString() + .splitByLines() + } + suspend fun startTls(tls: TLS): TcpSocket fun close() @@ -69,11 +81,3 @@ suspend fun TcpSocket.receiveAvailable(buffer: ByteArray) = receiveAvailable(buf internal expect object PlatformSocketBuilder : TcpSocket.Builder suspend fun TcpSocket.receiveFully(size: Int): ByteArray = ByteArray(size).also { receiveFully(it) } - -fun TcpSocket.linesFlow(): Flow = flow { - val buffer = ByteArray(8192) - while (true) { - val size = receiveAvailable(buffer) - emit(buffer.subArray(size)) - } -}.decodeToString().splitByLines() \ No newline at end of file diff --git a/src/commonTest/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumClientTest.kt b/src/commonTest/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumClientTest.kt index b68bcd5b4..7bd605ebe 100644 --- a/src/commonTest/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumClientTest.kt +++ b/src/commonTest/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumClientTest.kt @@ -2,15 +2,20 @@ package fr.acinq.lightning.blockchain.electrum import fr.acinq.bitcoin.* import fr.acinq.lightning.blockchain.fee.FeeratePerKw +import fr.acinq.lightning.io.TcpSocket import fr.acinq.lightning.tests.utils.LightningTestSuite import fr.acinq.lightning.tests.utils.runSuspendTest import fr.acinq.lightning.utils.Connection +import fr.acinq.lightning.utils.ServerAddress import fr.acinq.lightning.utils.toByteVector32 import fr.acinq.secp256k1.Hex -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.flow.first -import kotlinx.coroutines.joinAll -import kotlinx.coroutines.launch +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import kotlinx.serialization.json.Json +import kotlinx.serialization.json.jsonObject +import kotlinx.serialization.json.jsonPrimitive +import org.kodein.log.LoggerFactory +import org.kodein.log.newLogger import kotlin.test.* import kotlin.time.Duration.Companion.seconds @@ -177,4 +182,76 @@ class ElectrumClientTest : LightningTestSuite() { client.stop() } + + @OptIn(DelicateCoroutinesApi::class) + @Test + fun `catch coroutine errors`() { + val myCustomError = "this is a test error" + + class MyTcpSocket() : TcpSocket { + val output = MutableSharedFlow() + override suspend fun send(bytes: ByteArray?, offset: Int, length: Int, flush: Boolean) { + if (bytes != null) { + CoroutineScope(Dispatchers.IO).launch { + val encoded = bytes.decodeToString(offset, offset + length) + val request = Json.parseToJsonElement(encoded) + val response = when (request.jsonObject["method"]!!.jsonPrimitive.content) { + "server.version" -> """{"jsonrpc": "2.0", "result": ["ElectrumX 1.15.0", "1.4"], "id": 0}""" + "blockchain.headers.subscribe" -> """{"jsonrpc": "2.0", "result": {"hex": "000080209a35ef4422bc37b0e1c3df9d32cfaaef6a6d31047c0202000000000000000000b9f14c32922d305844c739829ef13df9d188953e74a392720c02eeadd93acbf9ae22a464be8e05174bc5c367", "height": 797144}, "id": 1}""" + "blockchain.estimatefee" -> """{"jsonrpc": "2.0", "error": {"code": 42, "message": "$myCustomError"}, "id": 2}""" // we return an error, as if estimatefee had failed + else -> """{"jsonrpc": "2.0", "error": {"code": 43, "message": "unhandled request"}, "id": 2}""" + } + output.emit(response) + } + } + } + + override suspend fun receiveFully(buffer: ByteArray, offset: Int, length: Int) = TODO("Not yet implemented") + override suspend fun receiveAvailable(buffer: ByteArray, offset: Int, length: Int): Int = TODO("Not yet implemented") + override suspend fun startTls(tls: TcpSocket.TLS): TcpSocket = TODO("Not yet implemented") + override fun close() {} + override fun linesFlow(): Flow = output.asSharedFlow() + } + + class MyBuilder() : TcpSocket.Builder { + override suspend fun connect(host: String, port: Int, tls: TcpSocket.TLS, loggerFactory: LoggerFactory): TcpSocket { + return MyTcpSocket() + } + } + + val errorFlow = MutableStateFlow(null) + val loggerFactory = LoggerFactory.default + val logger = loggerFactory.newLogger(this::class) + val myErrorHandler = CoroutineExceptionHandler { _, e -> + logger.error(e) { "error caught in custom exception handler" } + errorFlow.value = e + } + + runBlocking(Dispatchers.IO) { + withTimeout(15.seconds) { + val builder = MyBuilder() + // from Kotlin's documentation: + // all children coroutines (coroutines created in the context of another Job) delegate handling of their exceptions to their parent coroutine, which + // also delegates to the parent, and so on until the root, so the CoroutineExceptionHandler installed in their context is never used + // => here we need to create a new root scope otherwise our exception handler will not be used + val client = ElectrumClient(builder, GlobalScope, LoggerFactory.default, myErrorHandler) + client.connect(ServerAddress("my-test-node", 50002, TcpSocket.TLS.DISABLED)) // address and port do not matter, but we cannot use TLS (not implemented, see above) + errorFlow.filterNotNull().first { it.message!!.contains(myCustomError) } + client.stop() + } + + // if we use runBlocking's scope, our exception handler will not be used + errorFlow.value = null + val error = assertFails { + withTimeout(15.seconds) { + val builder = MyBuilder() + val client = ElectrumClient(builder, this, LoggerFactory.default, myErrorHandler) + client.connect(ServerAddress("my-test-node", 50002, TcpSocket.TLS.DISABLED)) // address and port do not matter, but we cannot use TLS (not implemented, see above) + errorFlow.filterNotNull().first { it.message!!.contains(myCustomError) } + client.stop() + } + } + assertTrue(error.message!!.contains(myCustomError)) + } + } }