From 29cd82ad2a79b95ca0b3c318032e57dd77f31024 Mon Sep 17 00:00:00 2001 From: Oleg Smelov Date: Thu, 17 Mar 2022 16:00:31 +0400 Subject: [PATCH 1/8] [TH2-3249] gRPC interface for codec pipeline --- build.gradle | 3 +- .../exactpro/th2/rptdataprovider/Context.kt | 34 ++-- .../com/exactpro/th2/rptdataprovider/Main.kt | 5 +- .../entities/configuration/Configuration.kt | 8 +- .../entities/internal/PipelineStepObject.kt | 4 +- .../messages/MessageBatchConverter.kt | 5 +- .../handlers/messages/MessageBatchDecoder.kt | 6 +- .../producers/MessageProducer.kt | 14 +- .../services/{rabbitmq => }/CodecRequest.kt | 2 +- .../services/DecoderService.kt | 5 + .../services/grpc/GrpcService.kt | 158 ++++++++++++++++++ .../services/rabbitmq/RabbitMqService.kt | 14 +- 12 files changed, 219 insertions(+), 39 deletions(-) rename src/main/kotlin/com/exactpro/th2/rptdataprovider/services/{rabbitmq => }/CodecRequest.kt (98%) create mode 100644 src/main/kotlin/com/exactpro/th2/rptdataprovider/services/DecoderService.kt create mode 100644 src/main/kotlin/com/exactpro/th2/rptdataprovider/services/grpc/GrpcService.kt diff --git a/build.gradle b/build.gradle index c0472c6a..bae13fc5 100644 --- a/build.gradle +++ b/build.gradle @@ -46,7 +46,7 @@ dependencies { implementation 'org.slf4j:slf4j-api' implementation 'org.slf4j:slf4j-log4j12' - implementation 'com.exactpro.th2:common:3.31.0' + implementation 'com.exactpro.th2:common:3.34.0-TH2-3340-1995979126-SNAPSHOT' implementation group: 'net.jpountz.lz4', name: 'lz4', version: '1.3.0' @@ -54,6 +54,7 @@ dependencies { implementation 'com.exactpro.th2:cradle-cassandra:2.20.2' implementation 'com.exactpro.th2:grpc-data-provider:0.1.4' + implementation 'com.exactpro.th2:grpc-codec:0.0.4-TH2-3340-1998223634-SNAPSHOT' implementation 'io.github.microutils:kotlin-logging:1.7.10' diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/Context.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/Context.kt index cc482345..319801c2 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/Context.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/Context.kt @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2022 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,11 +18,9 @@ package com.exactpro.th2.rptdataprovider import com.exactpro.cradle.CradleManager -import com.exactpro.th2.common.grpc.MessageBatch -import com.exactpro.th2.common.grpc.MessageGroup import com.exactpro.th2.common.grpc.MessageGroupBatch -import com.exactpro.th2.common.grpc.RawMessageBatch import com.exactpro.th2.common.schema.grpc.configuration.GrpcConfiguration +import com.exactpro.th2.common.schema.grpc.router.GrpcRouter import com.exactpro.th2.common.schema.message.MessageRouter import com.exactpro.th2.rptdataprovider.cache.EventCache import com.exactpro.th2.rptdataprovider.cache.MessageCache @@ -44,7 +42,9 @@ import com.exactpro.th2.rptdataprovider.handlers.SearchMessagesHandler import com.exactpro.th2.rptdataprovider.producers.EventProducer import com.exactpro.th2.rptdataprovider.producers.MessageProducer import com.exactpro.th2.rptdataprovider.server.ServerType +import com.exactpro.th2.rptdataprovider.services.DecoderService import com.exactpro.th2.rptdataprovider.services.cradle.CradleService +import com.exactpro.th2.rptdataprovider.services.grpc.GrpcService import com.exactpro.th2.rptdataprovider.services.rabbitmq.RabbitMqService import com.fasterxml.jackson.databind.DeserializationFeature import com.fasterxml.jackson.databind.ObjectMapper @@ -52,7 +52,7 @@ import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import io.ktor.http.* @Suppress("MemberVisibilityCanBePrivate") -class Context( +class Context constructor( val configuration: Configuration, val serverType: ServerType, @@ -68,6 +68,9 @@ class Context( val messageRouterRawBatch: MessageRouter, val messageRouterParsedBatch: MessageRouter, + + val grpcRouter: GrpcRouter, + val grpcConfig: GrpcConfiguration, val cradleService: CradleService = CradleService( @@ -75,11 +78,18 @@ class Context( cradleManager ), - val rabbitMqService: RabbitMqService = RabbitMqService( - configuration, - messageRouterParsedBatch, - messageRouterRawBatch - ), + val decoderService: DecoderService = if (configuration.codecUseGrpc.value.toBoolean()) { + GrpcService( + configuration, + grpcRouter + ) + } else { + RabbitMqService( + configuration, + messageRouterParsedBatch, + messageRouterRawBatch + ) + }, val eventProducer: EventProducer = EventProducer(cradleService, jacksonMapper), @@ -87,7 +97,7 @@ class Context( val messageProducer: MessageProducer = MessageProducer( cradleService, - rabbitMqService + decoderService ), val messageCache: MessageCache = MessageCache(configuration, messageProducer), @@ -145,4 +155,4 @@ class Context( } } } -} +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/Main.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/Main.kt index 4a34aad9..20972980 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/Main.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/Main.kt @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2022 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -77,6 +77,7 @@ class Main { messageRouterParsedBatch = configurationFactory.messageRouterMessageGroupBatch.also { resources += it }, + grpcRouter = configurationFactory.grpcRouter, grpcConfig = configurationFactory.grpcConfiguration ) } @@ -174,4 +175,4 @@ fun main(args: Array) { logger.error(ex) { "Cannot start the box" } exitProcess(1) } -} +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/configuration/Configuration.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/configuration/Configuration.kt index f7b7b6ab..fd597356 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/configuration/Configuration.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/configuration/Configuration.kt @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2022 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -53,6 +53,7 @@ class CustomConfigurationClass { val codecCallbackThreadPool: Int = 10 val codecRequestThreadPool: Int = 1 val codecUsePinAttributes: Boolean = true + val codecUseGrpc: Boolean = false val grpcWriterMessageBuffer: Int = 100 @@ -205,4 +206,7 @@ class Configuration(customConfiguration: CustomConfigurationClass) { val codecUsePinAttributes: Variable = Variable("codecUsePinAttributes", customConfiguration.codecUsePinAttributes.toString(), "true") -} + + val codecUseGrpc: Variable = + Variable("codecUseGrpc", customConfiguration.codecUseGrpc.toString(), "false") +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/internal/PipelineStepObject.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/internal/PipelineStepObject.kt index a15c74fd..e4278ddc 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/internal/PipelineStepObject.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/internal/PipelineStepObject.kt @@ -18,8 +18,8 @@ package com.exactpro.th2.rptdataprovider.entities.internal import com.exactpro.cradle.messages.StoredMessageId import com.exactpro.th2.rptdataprovider.entities.responses.MessageBatchWrapper -import com.exactpro.th2.rptdataprovider.services.rabbitmq.CodecBatchRequest -import com.exactpro.th2.rptdataprovider.services.rabbitmq.CodecBatchResponse +import com.exactpro.th2.rptdataprovider.services.CodecBatchRequest +import com.exactpro.th2.rptdataprovider.services.CodecBatchResponse import java.time.Instant data class StreamEndObject( diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageBatchConverter.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageBatchConverter.kt index 5ff4f7e1..703b35f9 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageBatchConverter.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageBatchConverter.kt @@ -6,11 +6,10 @@ import com.exactpro.th2.rptdataprovider.Context import com.exactpro.th2.rptdataprovider.entities.internal.PipelineCodecRequest import com.exactpro.th2.rptdataprovider.entities.internal.PipelineRawBatch import com.exactpro.th2.rptdataprovider.entities.requests.SseMessageSearchRequest -import com.exactpro.th2.rptdataprovider.entities.responses.MessageBatchWrapper import com.exactpro.th2.rptdataprovider.handlers.PipelineComponent import com.exactpro.th2.rptdataprovider.handlers.PipelineStatus import com.exactpro.th2.rptdataprovider.handlers.StreamName -import com.exactpro.th2.rptdataprovider.services.rabbitmq.CodecBatchRequest +import com.exactpro.th2.rptdataprovider.services.CodecBatchRequest import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.isActive import kotlinx.coroutines.launch @@ -131,4 +130,4 @@ class MessageBatchConverter( sendToChannel(pipelineMessage) } } -} +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageBatchDecoder.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageBatchDecoder.kt index 726232a8..0ca01331 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageBatchDecoder.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageBatchDecoder.kt @@ -9,7 +9,7 @@ import com.exactpro.th2.rptdataprovider.entities.requests.SseMessageSearchReques import com.exactpro.th2.rptdataprovider.handlers.PipelineComponent import com.exactpro.th2.rptdataprovider.handlers.PipelineStatus import com.exactpro.th2.rptdataprovider.handlers.StreamName -import com.exactpro.th2.rptdataprovider.services.rabbitmq.CodecBatchResponse +import com.exactpro.th2.rptdataprovider.services.CodecBatchResponse import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.isActive @@ -106,7 +106,7 @@ class MessageBatchDecoder( pipelineMessage.lastProcessedId, pipelineMessage.lastScannedTime, pipelineMessage.storedBatchWrapper, - context.rabbitMqService.sendToCodec(pipelineMessage.codecRequest), + context.decoderService.sendToCodec(pipelineMessage.codecRequest), protocol ) pipelineStatus.decodeEnd(streamName.toString(), pipelineMessage.codecRequest.protobufRawMessageBatch.groupsCount.toLong()) @@ -125,4 +125,4 @@ class MessageBatchDecoder( sendToChannel(pipelineMessage) } } -} +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/producers/MessageProducer.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/producers/MessageProducer.kt index d58ac647..60195a0d 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/producers/MessageProducer.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/producers/MessageProducer.kt @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2022 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,17 +20,15 @@ import com.exactpro.cradle.messages.StoredMessageId import com.exactpro.th2.common.grpc.* import com.exactpro.th2.rptdataprovider.entities.internal.BodyWrapper import com.exactpro.th2.rptdataprovider.entities.internal.Message -import com.exactpro.th2.rptdataprovider.handlers.StreamName import com.exactpro.th2.rptdataprovider.services.cradle.CradleMessageNotFoundException import com.exactpro.th2.rptdataprovider.services.cradle.CradleService -import com.exactpro.th2.rptdataprovider.services.rabbitmq.CodecBatchRequest -import com.exactpro.th2.rptdataprovider.services.rabbitmq.RabbitMqService +import com.exactpro.th2.rptdataprovider.services.CodecBatchRequest +import com.exactpro.th2.rptdataprovider.services.DecoderService class MessageProducer( private val cradle: CradleService, - private val rabbitMqService: RabbitMqService + private val rabbitMqService: DecoderService ) { - suspend fun fromId(id: StoredMessageId): Message { return cradle.getMessageSuspend(id)?.let { stored -> @@ -62,8 +60,6 @@ class MessageProducer( Message(stored, decoded, stored.content, setOf()) } - ?: throw CradleMessageNotFoundException("message '${id}' does not exist in cradle") } -} - +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/rabbitmq/CodecRequest.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/CodecRequest.kt similarity index 98% rename from src/main/kotlin/com/exactpro/th2/rptdataprovider/services/rabbitmq/CodecRequest.kt rename to src/main/kotlin/com/exactpro/th2/rptdataprovider/services/CodecRequest.kt index 5a7c56b9..0e5052f8 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/rabbitmq/CodecRequest.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/CodecRequest.kt @@ -14,7 +14,7 @@ * limitations under the License. ******************************************************************************/ -package com.exactpro.th2.rptdataprovider.services.rabbitmq +package com.exactpro.th2.rptdataprovider.services import com.exactpro.th2.common.grpc.Direction import com.exactpro.th2.common.grpc.MessageGroupBatch diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/DecoderService.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/DecoderService.kt new file mode 100644 index 00000000..2aeddf25 --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/DecoderService.kt @@ -0,0 +1,5 @@ +package com.exactpro.th2.rptdataprovider.services + +interface DecoderService { + suspend fun sendToCodec(request: CodecBatchRequest): CodecBatchResponse +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/grpc/GrpcService.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/grpc/GrpcService.kt new file mode 100644 index 00000000..b72d447f --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/grpc/GrpcService.kt @@ -0,0 +1,158 @@ +package com.exactpro.th2.rptdataprovider.services.grpc + +import com.exactpro.th2.codec.grpc.AsyncCodecService +import com.exactpro.th2.common.grpc.MessageGroupBatch +import com.exactpro.th2.common.message.direction +import com.exactpro.th2.common.message.sequence +import com.exactpro.th2.common.message.sessionAlias +import com.exactpro.th2.common.schema.grpc.router.GrpcRouter +import com.exactpro.th2.rptdataprovider.entities.configuration.Configuration +import com.exactpro.th2.rptdataprovider.handlers.PipelineStatus +import com.exactpro.th2.rptdataprovider.services.CodecBatchRequest +import com.exactpro.th2.rptdataprovider.services.CodecBatchResponse +import com.exactpro.th2.rptdataprovider.services.CodecRequestId +import com.exactpro.th2.rptdataprovider.services.DecoderService +import com.exactpro.th2.rptdataprovider.services.MessageGroupBatchWrapper +import com.exactpro.th2.rptdataprovider.services.PendingCodecBatchRequest +import io.grpc.stub.StreamObserver +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.asCoroutineDispatcher +import kotlinx.coroutines.cancel +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.withContext +import mu.KotlinLogging +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.Executors + +class GrpcService( + configuration: Configuration, + grpcRouter: GrpcRouter +) : DecoderService { + + private val responseTimeout = configuration.codecResponseTimeout.value.toLong() + private val pendingRequests = ConcurrentHashMap() + private val usePinAttributes = configuration.codecUsePinAttributes.value.toBoolean() + + private val codecService = grpcRouter.getService(AsyncCodecService::class.java) + private val maximumPendingRequests = configuration.codecPendingBatchLimit.value.toInt() + + private val requestSenderScope = CoroutineScope( + Executors.newFixedThreadPool(configuration.codecRequestThreadPool.value.toInt()).asCoroutineDispatcher() + ) + + private val callbackScope = CoroutineScope( + Executors.newFixedThreadPool(configuration.codecCallbackThreadPool.value.toInt()).asCoroutineDispatcher() + ) + + private val codecLatency = PipelineStatus.codecLatency + + private val responseObserver = object : StreamObserver { + override fun onNext(decodedBatch: MessageGroupBatch) { + callbackScope.launch { + + val response = MessageGroupBatchWrapper(decodedBatch) + + LOGGER.trace { "codec response with hash ${response.responseHash} has been received" } + + pendingRequests.remove(response.requestId)?.let { + codecLatency.gaugeDec(listOf(it.streamName)) + codecLatency.setDuration(it.startTimestamp.toDouble(), listOf(it.streamName)) + it.completableDeferred.complete(response) + } + ?: LOGGER.trace { + val firstSequence = + decodedBatch.groupsList.firstOrNull()?.messagesList?.firstOrNull()?.sequence + val lastSequence = + decodedBatch.groupsList?.lastOrNull()?.messagesList?.lastOrNull()?.sequence + val stream = + "${decodedBatch.groupsList.firstOrNull()?.messagesList?.firstOrNull()?.message?.sessionAlias}:${decodedBatch.groupsList.firstOrNull()?.messagesList?.firstOrNull()?.message?.direction.toString()}" + "codec response with hash ${response.responseHash} has no matching requests (stream=${stream} firstId=${firstSequence} lastId=${lastSequence} requestId=${response.requestId})" + } + } + } + + override fun onError(t: Throwable?) = LOGGER.error(t) { "gRPC request failed." } + override fun onCompleted() {} + } + + override suspend fun sendToCodec(request: CodecBatchRequest): CodecBatchResponse { + + return withContext(requestSenderScope.coroutineContext) { + while (pendingRequests.keys.size > maximumPendingRequests) { + delay(100) + } + + pendingRequests.computeIfAbsent(request.requestId) { + val pendingRequest = request.toPending() + + callbackScope.launch { + delay(responseTimeout) + + pendingRequest.completableDeferred.let { + if (it.isActive && + pendingRequests[request.requestId]?.completableDeferred == pendingRequest.completableDeferred + ) { + + pendingRequests.remove(request.requestId) + it.complete(null) + + codecLatency.gaugeDec(listOf(request.streamName)) + codecLatency.setDuration( + pendingRequest.startTimestamp.toDouble(), + listOf(request.streamName) + ) + + LOGGER.warn { + val firstSequence = + request.protobufRawMessageBatch.groupsList.first()?.messagesList?.first()?.rawMessage?.sequence + val lastSequence = + request.protobufRawMessageBatch.groupsList.last()?.messagesList?.last()?.rawMessage?.sequence + val stream = + "${request.protobufRawMessageBatch.groupsList.first()?.messagesList?.first()?.rawMessage?.sessionAlias}:${request.protobufRawMessageBatch.groupsList.first()?.messagesList?.first()?.rawMessage?.direction.toString()}" + + "codec request timed out after $responseTimeout ms (stream=${stream} firstId=${firstSequence} lastId=${lastSequence} hash=${request.requestHash}) requestId=${request.requestId}" + } + } + } + } + + try { + if (usePinAttributes) { + val sessionAlias = + request.protobufRawMessageBatch.groupsList + .first().messagesList + .first().rawMessage.metadata.id.connectionId.sessionAlias + + codecLatency.gaugeInc(listOf(request.streamName)) + codecService.decode(request.protobufRawMessageBatch, mapOf("session-alias" to sessionAlias), responseObserver) + } else { + codecService.decode(request.protobufRawMessageBatch, emptyMap(), responseObserver) + } + + LOGGER.trace { + val firstSequence = + request.protobufRawMessageBatch.groupsList.first()?.messagesList?.first()?.rawMessage?.sequence + val lastSequence = + request.protobufRawMessageBatch.groupsList.last()?.messagesList?.last()?.rawMessage?.sequence + val stream = + "${request.protobufRawMessageBatch.groupsList.first()?.messagesList?.first()?.rawMessage?.sessionAlias}:${request.protobufRawMessageBatch.groupsList.first()?.messagesList?.first()?.rawMessage?.direction.toString()}" + + "codec request with hash ${request.requestHash} has been sent (stream=${stream} firstId=${firstSequence} lastId=${lastSequence} hash=${request.requestHash}) requestId=${request.requestId})" + } + LOGGER.debug { "codec request with hash ${request.requestHash.hashCode()} has been sent" } + + } catch (e: Exception) { + pendingRequest.completableDeferred.cancel( + "Unexpected exception while trying to send a codec request", e + ) + } + pendingRequest + }.toResponse() + } + } + + companion object { + private val LOGGER = KotlinLogging.logger { } + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/rabbitmq/RabbitMqService.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/rabbitmq/RabbitMqService.kt index 00770153..5fa52b1e 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/rabbitmq/RabbitMqService.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/rabbitmq/RabbitMqService.kt @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2022 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,6 +24,12 @@ import com.exactpro.th2.common.schema.message.MessageListener import com.exactpro.th2.common.schema.message.MessageRouter import com.exactpro.th2.rptdataprovider.entities.configuration.Configuration import com.exactpro.th2.rptdataprovider.handlers.PipelineStatus +import com.exactpro.th2.rptdataprovider.services.CodecBatchRequest +import com.exactpro.th2.rptdataprovider.services.CodecBatchResponse +import com.exactpro.th2.rptdataprovider.services.CodecRequestId +import com.exactpro.th2.rptdataprovider.services.DecoderService +import com.exactpro.th2.rptdataprovider.services.MessageGroupBatchWrapper +import com.exactpro.th2.rptdataprovider.services.PendingCodecBatchRequest import kotlinx.coroutines.* import mu.KotlinLogging import java.util.concurrent.ConcurrentHashMap @@ -33,10 +39,10 @@ class RabbitMqService( configuration: Configuration, messageRouterParsedBatch: MessageRouter, private val messageRouterRawBatch: MessageRouter -) { +) : DecoderService { companion object { - val logger = KotlinLogging.logger { } + private val logger = KotlinLogging.logger { } } private val responseTimeout = configuration.codecResponseTimeout.value.toLong() @@ -87,7 +93,7 @@ class RabbitMqService( ) } - suspend fun sendToCodec(request: CodecBatchRequest): CodecBatchResponse { + override suspend fun sendToCodec(request: CodecBatchRequest): CodecBatchResponse { return withContext(mqRequestSenderScope.coroutineContext) { while (pendingRequests.keys.size > maximumPendingRequests) { From 5279d3b22df716c5e19e5ddcb2722f06ef77120c Mon Sep 17 00:00:00 2001 From: Oleg Smelov Date: Thu, 17 Mar 2022 16:12:34 +0400 Subject: [PATCH 2/8] kotlin version upgrade --- build.gradle | 2 +- .../th2/rptdataprovider/handlers/messages/StreamMerger.kt | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/build.gradle b/build.gradle index bae13fc5..5435b890 100644 --- a/build.gradle +++ b/build.gradle @@ -16,7 +16,7 @@ plugins { id 'java' - id 'org.jetbrains.kotlin.jvm' version '1.3.72' + id 'org.jetbrains.kotlin.jvm' version '1.5.32' id 'application' id 'com.palantir.docker' version '0.25.0' } diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/StreamMerger.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/StreamMerger.kt index 0ffd8490..565e2383 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/StreamMerger.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/StreamMerger.kt @@ -124,11 +124,11 @@ class StreamMerger( private fun getLastScannedObject(): PipelineStepObject? { return if (searchRequest.searchDirection == TimeRelation.AFTER) { messageStreams - .maxBy { it.currentElement?.lastScannedTime ?: Instant.MIN } + .maxByOrNull { it.currentElement?.lastScannedTime ?: Instant.MIN } ?.previousElement } else { messageStreams - .minBy { it.currentElement?.lastScannedTime ?: Instant.MIN } + .minByOrNull { it.currentElement?.lastScannedTime ?: Instant.MIN } ?.previousElement } } From 9538dd3be2733741a9eddc681ed31e116e51a9b9 Mon Sep 17 00:00:00 2001 From: Oleg Smelov Date: Fri, 1 Apr 2022 15:25:22 +0400 Subject: [PATCH 3/8] using SESSION_ALIAS_LABEL constant value --- .../exactpro/th2/rptdataprovider/services/grpc/GrpcService.kt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/grpc/GrpcService.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/grpc/GrpcService.kt index b72d447f..5ca08d84 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/grpc/GrpcService.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/grpc/GrpcService.kt @@ -5,6 +5,7 @@ import com.exactpro.th2.common.grpc.MessageGroupBatch import com.exactpro.th2.common.message.direction import com.exactpro.th2.common.message.sequence import com.exactpro.th2.common.message.sessionAlias +import com.exactpro.th2.common.metrics.SESSION_ALIAS_LABEL import com.exactpro.th2.common.schema.grpc.router.GrpcRouter import com.exactpro.th2.rptdataprovider.entities.configuration.Configuration import com.exactpro.th2.rptdataprovider.handlers.PipelineStatus @@ -125,7 +126,7 @@ class GrpcService( .first().rawMessage.metadata.id.connectionId.sessionAlias codecLatency.gaugeInc(listOf(request.streamName)) - codecService.decode(request.protobufRawMessageBatch, mapOf("session-alias" to sessionAlias), responseObserver) + codecService.decode(request.protobufRawMessageBatch, mapOf(SESSION_ALIAS_LABEL to sessionAlias), responseObserver) } else { codecService.decode(request.protobufRawMessageBatch, emptyMap(), responseObserver) } From 64871e87a2bc0aa88f4523b7336a29415ee0fe8d Mon Sep 17 00:00:00 2001 From: lumber1000 Date: Wed, 20 Apr 2022 21:06:17 +0400 Subject: [PATCH 4/8] gRPC and MQ services refactoring --- build.gradle | 2 +- .../exactpro/th2/rptdataprovider/Context.kt | 9 +- .../producers/MessageProducer.kt | 4 +- .../services/AbstractDecoderService.kt | 140 ++++++++++++++++ .../services/DecoderService.kt | 5 - .../services/grpc/GrpcService.kt | 156 +++--------------- .../services/rabbitmq/RabbitMqService.kt | 152 +---------------- 7 files changed, 176 insertions(+), 292 deletions(-) create mode 100644 src/main/kotlin/com/exactpro/th2/rptdataprovider/services/AbstractDecoderService.kt delete mode 100644 src/main/kotlin/com/exactpro/th2/rptdataprovider/services/DecoderService.kt diff --git a/build.gradle b/build.gradle index 5435b890..82ad3052 100644 --- a/build.gradle +++ b/build.gradle @@ -54,7 +54,7 @@ dependencies { implementation 'com.exactpro.th2:cradle-cassandra:2.20.2' implementation 'com.exactpro.th2:grpc-data-provider:0.1.4' - implementation 'com.exactpro.th2:grpc-codec:0.0.4-TH2-3340-1998223634-SNAPSHOT' + implementation 'com.exactpro.th2:grpc-codec:0.0.2' implementation 'io.github.microutils:kotlin-logging:1.7.10' diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/Context.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/Context.kt index 319801c2..676db20e 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/Context.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/Context.kt @@ -16,7 +16,6 @@ package com.exactpro.th2.rptdataprovider - import com.exactpro.cradle.CradleManager import com.exactpro.th2.common.grpc.MessageGroupBatch import com.exactpro.th2.common.schema.grpc.configuration.GrpcConfiguration @@ -42,17 +41,17 @@ import com.exactpro.th2.rptdataprovider.handlers.SearchMessagesHandler import com.exactpro.th2.rptdataprovider.producers.EventProducer import com.exactpro.th2.rptdataprovider.producers.MessageProducer import com.exactpro.th2.rptdataprovider.server.ServerType -import com.exactpro.th2.rptdataprovider.services.DecoderService +import com.exactpro.th2.rptdataprovider.services.AbstractDecoderService import com.exactpro.th2.rptdataprovider.services.cradle.CradleService import com.exactpro.th2.rptdataprovider.services.grpc.GrpcService import com.exactpro.th2.rptdataprovider.services.rabbitmq.RabbitMqService import com.fasterxml.jackson.databind.DeserializationFeature import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper -import io.ktor.http.* +import io.ktor.http.CacheControl @Suppress("MemberVisibilityCanBePrivate") -class Context constructor( +class Context( val configuration: Configuration, val serverType: ServerType, @@ -78,7 +77,7 @@ class Context constructor( cradleManager ), - val decoderService: DecoderService = if (configuration.codecUseGrpc.value.toBoolean()) { + val decoderService: AbstractDecoderService = if (configuration.codecUseGrpc.value.toBoolean()) { GrpcService( configuration, grpcRouter diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/producers/MessageProducer.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/producers/MessageProducer.kt index 60195a0d..1426e8b0 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/producers/MessageProducer.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/producers/MessageProducer.kt @@ -23,11 +23,11 @@ import com.exactpro.th2.rptdataprovider.entities.internal.Message import com.exactpro.th2.rptdataprovider.services.cradle.CradleMessageNotFoundException import com.exactpro.th2.rptdataprovider.services.cradle.CradleService import com.exactpro.th2.rptdataprovider.services.CodecBatchRequest -import com.exactpro.th2.rptdataprovider.services.DecoderService +import com.exactpro.th2.rptdataprovider.services.AbstractDecoderService class MessageProducer( private val cradle: CradleService, - private val rabbitMqService: DecoderService + private val rabbitMqService: AbstractDecoderService ) { suspend fun fromId(id: StoredMessageId): Message { diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/AbstractDecoderService.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/AbstractDecoderService.kt new file mode 100644 index 00000000..745dea1b --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/AbstractDecoderService.kt @@ -0,0 +1,140 @@ +/******************************************************************************* + * Copyright 2022 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ + +package com.exactpro.th2.rptdataprovider.services + +import com.exactpro.th2.common.grpc.MessageGroupBatch +import com.exactpro.th2.common.message.getSessionAliasAndDirection +import com.exactpro.th2.common.message.sequence +import com.exactpro.th2.common.message.sessionAlias +import com.exactpro.th2.rptdataprovider.entities.configuration.Configuration +import com.exactpro.th2.rptdataprovider.handlers.PipelineStatus +import kotlinx.coroutines.* +import mu.KotlinLogging +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.Executors + +abstract class AbstractDecoderService(configuration: Configuration) { + private val responseTimeout = configuration.codecResponseTimeout.value.toLong() + private val pendingRequests = ConcurrentHashMap() + private val usePinAttributes = configuration.codecUsePinAttributes.value.toBoolean() + private val maximumPendingRequests = configuration.codecPendingBatchLimit.value.toInt() + + private val requestSenderScope = CoroutineScope( + Executors.newFixedThreadPool(configuration.codecRequestThreadPool.value.toInt()).asCoroutineDispatcher() + ) + + protected val callbackScope = CoroutineScope( + Executors.newFixedThreadPool(configuration.codecCallbackThreadPool.value.toInt()).asCoroutineDispatcher() + ) + + private val codecLatency = PipelineStatus.codecLatency + + protected fun handleResponse(decodedBatch: MessageGroupBatch) { + callbackScope.launch { + + val response = MessageGroupBatchWrapper(decodedBatch) + + LOGGER.trace { "codec response with hash ${response.responseHash} has been received" } + + pendingRequests.remove(response.requestId)?.let { + codecLatency.gaugeDec(listOf(it.streamName)) + codecLatency.setDuration(it.startTimestamp.toDouble(), listOf(it.streamName)) + it.completableDeferred.complete(response) + } + ?: LOGGER.trace { + val (firstSequence, lastSequence, stream) = getBatchProps(decodedBatch) + "codec response with hash ${response.responseHash} has no matching requests (stream=${stream} firstId=${firstSequence} lastId=${lastSequence} requestId=${response.requestId})" + } + } + } + + suspend fun sendToCodec(request: CodecBatchRequest): CodecBatchResponse { + return withContext(requestSenderScope.coroutineContext) { + while (pendingRequests.keys.size > maximumPendingRequests) { + delay(100) + } + + pendingRequests.computeIfAbsent(request.requestId) { + val pendingRequest = request.toPending() + + callbackScope.launch { + delay(responseTimeout) + + pendingRequest.completableDeferred.let { + if (it.isActive && + pendingRequests[request.requestId]?.completableDeferred == pendingRequest.completableDeferred + ) { + pendingRequests.remove(request.requestId) + it.complete(null) + + codecLatency.gaugeDec(listOf(request.streamName)) + codecLatency.setDuration( + pendingRequest.startTimestamp.toDouble(), + listOf(request.streamName) + ) + + LOGGER.warn { + val (firstSequence, lastSequence, stream) = getBatchProps(request.protobufRawMessageBatch) + "codec request timed out after $responseTimeout ms (stream=${stream} firstId=${firstSequence} lastId=${lastSequence} hash=${request.requestHash}) requestId=${request.requestId}" + } + } + } + } + + try { + if (usePinAttributes) { + val sessionAlias = request.protobufRawMessageBatch.groupsList[0] + .messagesList[0].rawMessage.sessionAlias + + codecLatency.gaugeInc(listOf(request.streamName)) + decode(request.protobufRawMessageBatch, sessionAlias) + } else { + decode(request.protobufRawMessageBatch) + } + + LOGGER.trace { + val (firstSequence, lastSequence, stream) = getBatchProps(request.protobufRawMessageBatch) + "codec request with hash ${request.requestHash} has been sent (stream=${stream} firstId=${firstSequence} lastId=${lastSequence} hash=${request.requestHash}) requestId=${request.requestId})" + } + LOGGER.debug { "codec request with hash ${request.requestHash.hashCode()} has been sent" } + + } catch (e: Exception) { + pendingRequest.completableDeferred.cancel("Unexpected exception while trying to send a codec request", e) + } + + pendingRequest + }.toResponse() + } + } + + protected abstract fun decode(messageGroupBatch: MessageGroupBatch) + protected abstract fun decode(messageGroupBatch: MessageGroupBatch, sessionAlias: String) + + private fun getBatchProps(batch: MessageGroupBatch): Triple { + val firstMessage = batch.groupsList.firstOrNull()?.messagesList?.firstOrNull() + + return Triple( + firstMessage?.sequence, + batch.groupsList.lastOrNull()?.messagesList?.lastOrNull()?.sequence, + firstMessage?.let { getSessionAliasAndDirection(firstMessage).joinToString(":") } + ) + } + + companion object { + private val LOGGER = KotlinLogging.logger { } + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/DecoderService.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/DecoderService.kt deleted file mode 100644 index 2aeddf25..00000000 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/DecoderService.kt +++ /dev/null @@ -1,5 +0,0 @@ -package com.exactpro.th2.rptdataprovider.services - -interface DecoderService { - suspend fun sendToCodec(request: CodecBatchRequest): CodecBatchResponse -} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/grpc/GrpcService.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/grpc/GrpcService.kt index 5ca08d84..cd8278c7 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/grpc/GrpcService.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/grpc/GrpcService.kt @@ -1,157 +1,45 @@ +/******************************************************************************* + * Copyright 2022 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ + package com.exactpro.th2.rptdataprovider.services.grpc import com.exactpro.th2.codec.grpc.AsyncCodecService import com.exactpro.th2.common.grpc.MessageGroupBatch -import com.exactpro.th2.common.message.direction -import com.exactpro.th2.common.message.sequence -import com.exactpro.th2.common.message.sessionAlias import com.exactpro.th2.common.metrics.SESSION_ALIAS_LABEL import com.exactpro.th2.common.schema.grpc.router.GrpcRouter import com.exactpro.th2.rptdataprovider.entities.configuration.Configuration -import com.exactpro.th2.rptdataprovider.handlers.PipelineStatus -import com.exactpro.th2.rptdataprovider.services.CodecBatchRequest -import com.exactpro.th2.rptdataprovider.services.CodecBatchResponse -import com.exactpro.th2.rptdataprovider.services.CodecRequestId -import com.exactpro.th2.rptdataprovider.services.DecoderService -import com.exactpro.th2.rptdataprovider.services.MessageGroupBatchWrapper -import com.exactpro.th2.rptdataprovider.services.PendingCodecBatchRequest +import com.exactpro.th2.rptdataprovider.services.AbstractDecoderService import io.grpc.stub.StreamObserver -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.asCoroutineDispatcher -import kotlinx.coroutines.cancel -import kotlinx.coroutines.delay -import kotlinx.coroutines.launch -import kotlinx.coroutines.withContext import mu.KotlinLogging -import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.Executors class GrpcService( configuration: Configuration, grpcRouter: GrpcRouter -) : DecoderService { - - private val responseTimeout = configuration.codecResponseTimeout.value.toLong() - private val pendingRequests = ConcurrentHashMap() - private val usePinAttributes = configuration.codecUsePinAttributes.value.toBoolean() +) : AbstractDecoderService(configuration) { private val codecService = grpcRouter.getService(AsyncCodecService::class.java) - private val maximumPendingRequests = configuration.codecPendingBatchLimit.value.toInt() - - private val requestSenderScope = CoroutineScope( - Executors.newFixedThreadPool(configuration.codecRequestThreadPool.value.toInt()).asCoroutineDispatcher() - ) - - private val callbackScope = CoroutineScope( - Executors.newFixedThreadPool(configuration.codecCallbackThreadPool.value.toInt()).asCoroutineDispatcher() - ) - - private val codecLatency = PipelineStatus.codecLatency - private val responseObserver = object : StreamObserver { - override fun onNext(decodedBatch: MessageGroupBatch) { - callbackScope.launch { - - val response = MessageGroupBatchWrapper(decodedBatch) - - LOGGER.trace { "codec response with hash ${response.responseHash} has been received" } - - pendingRequests.remove(response.requestId)?.let { - codecLatency.gaugeDec(listOf(it.streamName)) - codecLatency.setDuration(it.startTimestamp.toDouble(), listOf(it.streamName)) - it.completableDeferred.complete(response) - } - ?: LOGGER.trace { - val firstSequence = - decodedBatch.groupsList.firstOrNull()?.messagesList?.firstOrNull()?.sequence - val lastSequence = - decodedBatch.groupsList?.lastOrNull()?.messagesList?.lastOrNull()?.sequence - val stream = - "${decodedBatch.groupsList.firstOrNull()?.messagesList?.firstOrNull()?.message?.sessionAlias}:${decodedBatch.groupsList.firstOrNull()?.messagesList?.firstOrNull()?.message?.direction.toString()}" - "codec response with hash ${response.responseHash} has no matching requests (stream=${stream} firstId=${firstSequence} lastId=${lastSequence} requestId=${response.requestId})" - } - } - } - + override fun onNext(decodedBatch: MessageGroupBatch) = handleResponse(decodedBatch) override fun onError(t: Throwable?) = LOGGER.error(t) { "gRPC request failed." } override fun onCompleted() {} } - override suspend fun sendToCodec(request: CodecBatchRequest): CodecBatchResponse { - - return withContext(requestSenderScope.coroutineContext) { - while (pendingRequests.keys.size > maximumPendingRequests) { - delay(100) - } - - pendingRequests.computeIfAbsent(request.requestId) { - val pendingRequest = request.toPending() - - callbackScope.launch { - delay(responseTimeout) - - pendingRequest.completableDeferred.let { - if (it.isActive && - pendingRequests[request.requestId]?.completableDeferred == pendingRequest.completableDeferred - ) { - - pendingRequests.remove(request.requestId) - it.complete(null) - - codecLatency.gaugeDec(listOf(request.streamName)) - codecLatency.setDuration( - pendingRequest.startTimestamp.toDouble(), - listOf(request.streamName) - ) - - LOGGER.warn { - val firstSequence = - request.protobufRawMessageBatch.groupsList.first()?.messagesList?.first()?.rawMessage?.sequence - val lastSequence = - request.protobufRawMessageBatch.groupsList.last()?.messagesList?.last()?.rawMessage?.sequence - val stream = - "${request.protobufRawMessageBatch.groupsList.first()?.messagesList?.first()?.rawMessage?.sessionAlias}:${request.protobufRawMessageBatch.groupsList.first()?.messagesList?.first()?.rawMessage?.direction.toString()}" - - "codec request timed out after $responseTimeout ms (stream=${stream} firstId=${firstSequence} lastId=${lastSequence} hash=${request.requestHash}) requestId=${request.requestId}" - } - } - } - } - - try { - if (usePinAttributes) { - val sessionAlias = - request.protobufRawMessageBatch.groupsList - .first().messagesList - .first().rawMessage.metadata.id.connectionId.sessionAlias - - codecLatency.gaugeInc(listOf(request.streamName)) - codecService.decode(request.protobufRawMessageBatch, mapOf(SESSION_ALIAS_LABEL to sessionAlias), responseObserver) - } else { - codecService.decode(request.protobufRawMessageBatch, emptyMap(), responseObserver) - } - - LOGGER.trace { - val firstSequence = - request.protobufRawMessageBatch.groupsList.first()?.messagesList?.first()?.rawMessage?.sequence - val lastSequence = - request.protobufRawMessageBatch.groupsList.last()?.messagesList?.last()?.rawMessage?.sequence - val stream = - "${request.protobufRawMessageBatch.groupsList.first()?.messagesList?.first()?.rawMessage?.sessionAlias}:${request.protobufRawMessageBatch.groupsList.first()?.messagesList?.first()?.rawMessage?.direction.toString()}" - - "codec request with hash ${request.requestHash} has been sent (stream=${stream} firstId=${firstSequence} lastId=${lastSequence} hash=${request.requestHash}) requestId=${request.requestId})" - } - LOGGER.debug { "codec request with hash ${request.requestHash.hashCode()} has been sent" } - - } catch (e: Exception) { - pendingRequest.completableDeferred.cancel( - "Unexpected exception while trying to send a codec request", e - ) - } - pendingRequest - }.toResponse() - } - } + override fun decode(messageGroupBatch: MessageGroupBatch) = codecService.decode(messageGroupBatch, responseObserver) + override fun decode(messageGroupBatch: MessageGroupBatch, sessionAlias: String) = + codecService.decode(messageGroupBatch, mapOf(SESSION_ALIAS_LABEL to sessionAlias), responseObserver) companion object { private val LOGGER = KotlinLogging.logger { } diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/rabbitmq/RabbitMqService.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/rabbitmq/RabbitMqService.kt index 5fa52b1e..9443ddfa 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/rabbitmq/RabbitMqService.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/rabbitmq/RabbitMqService.kt @@ -17,159 +17,21 @@ package com.exactpro.th2.rptdataprovider.services.rabbitmq import com.exactpro.th2.common.grpc.MessageGroupBatch -import com.exactpro.th2.common.message.direction -import com.exactpro.th2.common.message.sequence -import com.exactpro.th2.common.message.sessionAlias -import com.exactpro.th2.common.schema.message.MessageListener import com.exactpro.th2.common.schema.message.MessageRouter import com.exactpro.th2.rptdataprovider.entities.configuration.Configuration -import com.exactpro.th2.rptdataprovider.handlers.PipelineStatus -import com.exactpro.th2.rptdataprovider.services.CodecBatchRequest -import com.exactpro.th2.rptdataprovider.services.CodecBatchResponse -import com.exactpro.th2.rptdataprovider.services.CodecRequestId -import com.exactpro.th2.rptdataprovider.services.DecoderService -import com.exactpro.th2.rptdataprovider.services.MessageGroupBatchWrapper -import com.exactpro.th2.rptdataprovider.services.PendingCodecBatchRequest -import kotlinx.coroutines.* -import mu.KotlinLogging -import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.Executors +import com.exactpro.th2.rptdataprovider.services.AbstractDecoderService class RabbitMqService( configuration: Configuration, messageRouterParsedBatch: MessageRouter, private val messageRouterRawBatch: MessageRouter -) : DecoderService { +) : AbstractDecoderService(configuration) { - companion object { - private val logger = KotlinLogging.logger { } + init { + messageRouterParsedBatch.subscribeAll({ _, decodedBatch -> handleResponse(decodedBatch) }, "from_codec") } - private val responseTimeout = configuration.codecResponseTimeout.value.toLong() - private val pendingRequests = ConcurrentHashMap() - - private val usePinAttributes = configuration.codecUsePinAttributes.value.toBoolean() - - private val maximumPendingRequests = configuration.codecPendingBatchLimit.value.toInt() - - private val mqRequestSenderScope = CoroutineScope( - Executors.newFixedThreadPool(configuration.codecRequestThreadPool.value.toInt()).asCoroutineDispatcher() - ) - - private val mqCallbackScope = CoroutineScope( - Executors.newFixedThreadPool(configuration.codecCallbackThreadPool.value.toInt()).asCoroutineDispatcher() - ) - - private val codecLatency = PipelineStatus.codecLatency - - @Suppress("unused") - private val receiveChannel = mqCallbackScope.launch { - messageRouterParsedBatch.subscribeAll( - MessageListener { _, decodedBatch -> - mqCallbackScope.launch { - - val response = MessageGroupBatchWrapper(decodedBatch) - - logger.trace { "codec response with hash ${response.responseHash} has been received" } - - pendingRequests.remove(response.requestId)?.let { - codecLatency.gaugeDec(listOf(it.streamName)) - codecLatency.setDuration(it.startTimestamp.toDouble(), listOf(it.streamName)) - it.completableDeferred.complete(response) - } - ?: logger.trace { - val firstSequence = - decodedBatch.groupsList.firstOrNull()?.messagesList?.firstOrNull()?.sequence - val lastSequence = - decodedBatch.groupsList?.lastOrNull()?.messagesList?.lastOrNull()?.sequence - val stream = - "${decodedBatch.groupsList.firstOrNull()?.messagesList?.firstOrNull()?.message?.sessionAlias}:${decodedBatch.groupsList.firstOrNull()?.messagesList?.firstOrNull()?.message?.direction.toString()}" - "codec response with hash ${response.responseHash} has no matching requests (stream=${stream} firstId=${firstSequence} lastId=${lastSequence} requestId=${response.requestId})" - } - } - }, - - "from_codec" - ) - } - - override suspend fun sendToCodec(request: CodecBatchRequest): CodecBatchResponse { - - return withContext(mqRequestSenderScope.coroutineContext) { - while (pendingRequests.keys.size > maximumPendingRequests) { - delay(100) - } - - pendingRequests.computeIfAbsent(request.requestId) { - val pendingRequest = request.toPending() - - mqCallbackScope.launch { - delay(responseTimeout) - - pendingRequest.completableDeferred.let { - if (it.isActive && - pendingRequests[request.requestId]?.completableDeferred == pendingRequest.completableDeferred - ) { - - pendingRequests.remove(request.requestId) - it.complete(null) - - codecLatency.gaugeDec(listOf(request.streamName)) - codecLatency.setDuration( - pendingRequest.startTimestamp.toDouble(), - listOf(request.streamName) - ) - - logger.warn { - val firstSequence = - request.protobufRawMessageBatch.groupsList.first()?.messagesList?.first()?.rawMessage?.sequence - val lastSequence = - request.protobufRawMessageBatch.groupsList.last()?.messagesList?.last()?.rawMessage?.sequence - val stream = - "${request.protobufRawMessageBatch.groupsList.first()?.messagesList?.first()?.rawMessage?.sessionAlias}:${request.protobufRawMessageBatch.groupsList.first()?.messagesList?.first()?.rawMessage?.direction.toString()}" - - "codec request timed out after $responseTimeout ms (stream=${stream} firstId=${firstSequence} lastId=${lastSequence} hash=${request.requestHash}) requestId=${request.requestId}" - } - } - } - } - - try { - - if (usePinAttributes) { - val sessionAlias = - request.protobufRawMessageBatch.groupsList - .first().messagesList - .first().rawMessage.metadata.id.connectionId.sessionAlias - - codecLatency.gaugeInc(listOf(request.streamName)) - - messageRouterRawBatch.sendAll(request.protobufRawMessageBatch, sessionAlias) - } else { - messageRouterRawBatch.sendAll(request.protobufRawMessageBatch) - } - - logger.trace { - val firstSequence = - request.protobufRawMessageBatch.groupsList.first()?.messagesList?.first()?.rawMessage?.sequence - val lastSequence = - request.protobufRawMessageBatch.groupsList.last()?.messagesList?.last()?.rawMessage?.sequence - val stream = - "${request.protobufRawMessageBatch.groupsList.first()?.messagesList?.first()?.rawMessage?.sessionAlias}:${request.protobufRawMessageBatch.groupsList.first()?.messagesList?.first()?.rawMessage?.direction.toString()}" - - - "codec request with hash ${request.requestHash} has been sent (stream=${stream} firstId=${firstSequence} lastId=${lastSequence} hash=${request.requestHash}) requestId=${request.requestId})" - } - logger.debug { "codec request with hash ${request.requestHash.hashCode()} has been sent" } - - } catch (e: Exception) { - pendingRequest.completableDeferred.cancel( - "Unexpected exception while trying to send a codec request", e - ) - } - - pendingRequest - }.toResponse() - } - } + override fun decode(messageGroupBatch: MessageGroupBatch) = messageRouterRawBatch.sendAll(messageGroupBatch) + override fun decode(messageGroupBatch: MessageGroupBatch, sessionAlias: String) = + messageRouterRawBatch.sendAll(messageGroupBatch, sessionAlias) } \ No newline at end of file From 7377c0228a8ba38bd51c0664a5fff339883fccf4 Mon Sep 17 00:00:00 2001 From: Oleg Smelov Date: Thu, 21 Apr 2022 14:33:48 +0400 Subject: [PATCH 5/8] access modifier updated --- .../th2/rptdataprovider/services/AbstractDecoderService.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/AbstractDecoderService.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/AbstractDecoderService.kt index 745dea1b..70cf96c2 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/AbstractDecoderService.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/AbstractDecoderService.kt @@ -37,7 +37,7 @@ abstract class AbstractDecoderService(configuration: Configuration) { Executors.newFixedThreadPool(configuration.codecRequestThreadPool.value.toInt()).asCoroutineDispatcher() ) - protected val callbackScope = CoroutineScope( + private val callbackScope = CoroutineScope( Executors.newFixedThreadPool(configuration.codecCallbackThreadPool.value.toInt()).asCoroutineDispatcher() ) From 357acff9a4bf01aa50f099579ed4a5d51c4c0d4f Mon Sep 17 00:00:00 2001 From: Oleg Smelov Date: Wed, 1 Jun 2022 15:16:26 +0400 Subject: [PATCH 6/8] cradle update --- build.gradle | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/build.gradle b/build.gradle index 82ad3052..9033621b 100644 --- a/build.gradle +++ b/build.gradle @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright 2009-2020 Exactpro (Exactpro Systems Limited) + * Copyright 2009-2022 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -50,8 +50,8 @@ dependencies { implementation group: 'net.jpountz.lz4', name: 'lz4', version: '1.3.0' - implementation 'com.exactpro.th2:cradle-core:2.20.2' - implementation 'com.exactpro.th2:cradle-cassandra:2.20.2' + implementation 'com.exactpro.th2:cradle-core:2.21.0' + implementation 'com.exactpro.th2:cradle-cassandra:2.21.0' implementation 'com.exactpro.th2:grpc-data-provider:0.1.4' implementation 'com.exactpro.th2:grpc-codec:0.0.2' From 784dcde88819c0598ad320b77973fed9132e2794 Mon Sep 17 00:00:00 2001 From: Oleg Smelov Date: Wed, 1 Jun 2022 16:48:28 +0400 Subject: [PATCH 7/8] logging --- .../exactpro/th2/rptdataprovider/cache/MessageCache.kt | 3 ++- .../th2/rptdataprovider/producers/MessageProducer.kt | 8 ++++++++ .../com/exactpro/th2/rptdataprovider/server/HttpServer.kt | 3 +-- 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/cache/MessageCache.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/cache/MessageCache.kt index d217b90b..ccf8ea73 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/cache/MessageCache.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/cache/MessageCache.kt @@ -57,9 +57,10 @@ class MessageCache(configuration: Configuration, private val messageProducer: Me @InternalCoroutinesApi suspend fun getOrPut(id: String): Message { + logger.info { "MessageCache::getOrPut($id)" } return cache.get(id) ?: messageProducer.fromId(StoredMessageId.fromString(id)).also { - logger.debug { "Message cache miss for id=$id" } + logger.info { "Message cache miss for id=$id" } val type = it.parsedMessageGroup?.get(0)?.messageType diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/producers/MessageProducer.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/producers/MessageProducer.kt index 1426e8b0..d3b910c0 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/producers/MessageProducer.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/producers/MessageProducer.kt @@ -24,6 +24,7 @@ import com.exactpro.th2.rptdataprovider.services.cradle.CradleMessageNotFoundExc import com.exactpro.th2.rptdataprovider.services.cradle.CradleService import com.exactpro.th2.rptdataprovider.services.CodecBatchRequest import com.exactpro.th2.rptdataprovider.services.AbstractDecoderService +import mu.KotlinLogging class MessageProducer( private val cradle: CradleService, @@ -33,6 +34,8 @@ class MessageProducer( return cradle.getMessageSuspend(id)?.let { stored -> + LOGGER.info { "Stored message received from cradle: ${stored.id}" } + val decoded = rabbitMqService.sendToCodec( CodecBatchRequest( MessageGroupBatch @@ -62,4 +65,9 @@ class MessageProducer( } ?: throw CradleMessageNotFoundException("message '${id}' does not exist in cradle") } + + companion object { + private val LOGGER = KotlinLogging.logger {} + } + } \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/server/HttpServer.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/server/HttpServer.kt index b556d0d2..a974c8de 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/server/HttpServer.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/server/HttpServer.kt @@ -31,10 +31,8 @@ import com.exactpro.th2.rptdataprovider.entities.sse.EventType import com.exactpro.th2.rptdataprovider.entities.sse.HttpWriter import com.exactpro.th2.rptdataprovider.entities.sse.SseEvent import com.exactpro.th2.rptdataprovider.entities.sse.StreamWriter -import com.exactpro.th2.rptdataprovider.grpc.RptDataProviderGrpcHandler import com.exactpro.th2.rptdataprovider.logMetrics import com.exactpro.th2.rptdataprovider.services.cradle.CradleObjectNotFoundException -import io.grpc.Status import io.ktor.application.* import io.ktor.features.* import io.ktor.http.* @@ -355,6 +353,7 @@ class HttpServer(private val applicationContext: Context) { } get("/message/{id}") { + logger.info { "Requested message: ${call.parameters["id"]}" } val probe = call.parameters["probe"]?.toBoolean() ?: false handleRequest( call, context, "get single message", From a7b2bdf52e58375a15e111f1e0689cdb2bb02969 Mon Sep 17 00:00:00 2001 From: Oleg Smelov Date: Wed, 1 Jun 2022 18:20:24 +0400 Subject: [PATCH 8/8] logging --- .../exactpro/th2/rptdataprovider/cache/MessageCache.kt | 3 +-- .../th2/rptdataprovider/producers/MessageProducer.kt | 9 --------- .../exactpro/th2/rptdataprovider/server/HttpServer.kt | 9 ++++++--- 3 files changed, 7 insertions(+), 14 deletions(-) diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/cache/MessageCache.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/cache/MessageCache.kt index ccf8ea73..d217b90b 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/cache/MessageCache.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/cache/MessageCache.kt @@ -57,10 +57,9 @@ class MessageCache(configuration: Configuration, private val messageProducer: Me @InternalCoroutinesApi suspend fun getOrPut(id: String): Message { - logger.info { "MessageCache::getOrPut($id)" } return cache.get(id) ?: messageProducer.fromId(StoredMessageId.fromString(id)).also { - logger.info { "Message cache miss for id=$id" } + logger.debug { "Message cache miss for id=$id" } val type = it.parsedMessageGroup?.get(0)?.messageType diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/producers/MessageProducer.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/producers/MessageProducer.kt index d3b910c0..6736e5b8 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/producers/MessageProducer.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/producers/MessageProducer.kt @@ -24,7 +24,6 @@ import com.exactpro.th2.rptdataprovider.services.cradle.CradleMessageNotFoundExc import com.exactpro.th2.rptdataprovider.services.cradle.CradleService import com.exactpro.th2.rptdataprovider.services.CodecBatchRequest import com.exactpro.th2.rptdataprovider.services.AbstractDecoderService -import mu.KotlinLogging class MessageProducer( private val cradle: CradleService, @@ -33,9 +32,6 @@ class MessageProducer( suspend fun fromId(id: StoredMessageId): Message { return cradle.getMessageSuspend(id)?.let { stored -> - - LOGGER.info { "Stored message received from cradle: ${stored.id}" } - val decoded = rabbitMqService.sendToCodec( CodecBatchRequest( MessageGroupBatch @@ -65,9 +61,4 @@ class MessageProducer( } ?: throw CradleMessageNotFoundException("message '${id}' does not exist in cradle") } - - companion object { - private val LOGGER = KotlinLogging.logger {} - } - } \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/server/HttpServer.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/server/HttpServer.kt index a974c8de..3c4d566c 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/server/HttpServer.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/server/HttpServer.kt @@ -128,6 +128,7 @@ class HttpServer(private val applicationContext: Context) { @InternalAPI private suspend fun sendErrorCode(call: ApplicationCall, e: Exception, code: HttpStatusCode) { + logger.debug { "sendErrorCode(). Exception = $e, HttpStatusCode = ${code.value})" } withContext(NonCancellable) { call.respondText(e.rootCause?.message ?: e.toString(), ContentType.Text.Plain, code) } @@ -260,9 +261,12 @@ class HttpServer(private val applicationContext: Context) { checkContext(context) } cacheControl?.let { call.response.cacheControl(it) } + val responseText = jacksonMapper.asStringSuspend(calledFun.invoke()) + logger.trace { "RestAPI response: $responseText" } call.respondText( - jacksonMapper.asStringSuspend(calledFun.invoke()), - ContentType.Application.Json + responseText, + ContentType.Application.Json, + HttpStatusCode.OK ) coroutineContext.cancelChildren() }.join() @@ -353,7 +357,6 @@ class HttpServer(private val applicationContext: Context) { } get("/message/{id}") { - logger.info { "Requested message: ${call.parameters["id"]}" } val probe = call.parameters["probe"]?.toBoolean() ?: false handleRequest( call, context, "get single message",