diff --git a/build.gradle b/build.gradle index c0472c6a..9033621b 100644 --- a/build.gradle +++ b/build.gradle @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright 2009-2020 Exactpro (Exactpro Systems Limited) + * Copyright 2009-2022 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,7 +16,7 @@ plugins { id 'java' - id 'org.jetbrains.kotlin.jvm' version '1.3.72' + id 'org.jetbrains.kotlin.jvm' version '1.5.32' id 'application' id 'com.palantir.docker' version '0.25.0' } @@ -46,14 +46,15 @@ dependencies { implementation 'org.slf4j:slf4j-api' implementation 'org.slf4j:slf4j-log4j12' - implementation 'com.exactpro.th2:common:3.31.0' + implementation 'com.exactpro.th2:common:3.34.0-TH2-3340-1995979126-SNAPSHOT' implementation group: 'net.jpountz.lz4', name: 'lz4', version: '1.3.0' - implementation 'com.exactpro.th2:cradle-core:2.20.2' - implementation 'com.exactpro.th2:cradle-cassandra:2.20.2' + implementation 'com.exactpro.th2:cradle-core:2.21.0' + implementation 'com.exactpro.th2:cradle-cassandra:2.21.0' implementation 'com.exactpro.th2:grpc-data-provider:0.1.4' + implementation 'com.exactpro.th2:grpc-codec:0.0.2' implementation 'io.github.microutils:kotlin-logging:1.7.10' diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/Context.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/Context.kt index cc482345..676db20e 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/Context.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/Context.kt @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2022 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,13 +16,10 @@ package com.exactpro.th2.rptdataprovider - import com.exactpro.cradle.CradleManager -import com.exactpro.th2.common.grpc.MessageBatch -import com.exactpro.th2.common.grpc.MessageGroup import com.exactpro.th2.common.grpc.MessageGroupBatch -import com.exactpro.th2.common.grpc.RawMessageBatch import com.exactpro.th2.common.schema.grpc.configuration.GrpcConfiguration +import com.exactpro.th2.common.schema.grpc.router.GrpcRouter import com.exactpro.th2.common.schema.message.MessageRouter import com.exactpro.th2.rptdataprovider.cache.EventCache import com.exactpro.th2.rptdataprovider.cache.MessageCache @@ -44,12 +41,14 @@ import com.exactpro.th2.rptdataprovider.handlers.SearchMessagesHandler import com.exactpro.th2.rptdataprovider.producers.EventProducer import com.exactpro.th2.rptdataprovider.producers.MessageProducer import com.exactpro.th2.rptdataprovider.server.ServerType +import com.exactpro.th2.rptdataprovider.services.AbstractDecoderService import com.exactpro.th2.rptdataprovider.services.cradle.CradleService +import com.exactpro.th2.rptdataprovider.services.grpc.GrpcService import com.exactpro.th2.rptdataprovider.services.rabbitmq.RabbitMqService import com.fasterxml.jackson.databind.DeserializationFeature import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper -import io.ktor.http.* +import io.ktor.http.CacheControl @Suppress("MemberVisibilityCanBePrivate") class Context( @@ -68,6 +67,9 @@ class Context( val messageRouterRawBatch: MessageRouter, val messageRouterParsedBatch: MessageRouter, + + val grpcRouter: GrpcRouter, + val grpcConfig: GrpcConfiguration, val cradleService: CradleService = CradleService( @@ -75,11 +77,18 @@ class Context( cradleManager ), - val rabbitMqService: RabbitMqService = RabbitMqService( - configuration, - messageRouterParsedBatch, - messageRouterRawBatch - ), + val decoderService: AbstractDecoderService = if (configuration.codecUseGrpc.value.toBoolean()) { + GrpcService( + configuration, + grpcRouter + ) + } else { + RabbitMqService( + configuration, + messageRouterParsedBatch, + messageRouterRawBatch + ) + }, val eventProducer: EventProducer = EventProducer(cradleService, jacksonMapper), @@ -87,7 +96,7 @@ class Context( val messageProducer: MessageProducer = MessageProducer( cradleService, - rabbitMqService + decoderService ), val messageCache: MessageCache = MessageCache(configuration, messageProducer), @@ -145,4 +154,4 @@ class Context( } } } -} +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/Main.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/Main.kt index 4a34aad9..20972980 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/Main.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/Main.kt @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2022 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -77,6 +77,7 @@ class Main { messageRouterParsedBatch = configurationFactory.messageRouterMessageGroupBatch.also { resources += it }, + grpcRouter = configurationFactory.grpcRouter, grpcConfig = configurationFactory.grpcConfiguration ) } @@ -174,4 +175,4 @@ fun main(args: Array) { logger.error(ex) { "Cannot start the box" } exitProcess(1) } -} +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/configuration/Configuration.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/configuration/Configuration.kt index f7b7b6ab..fd597356 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/configuration/Configuration.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/configuration/Configuration.kt @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2022 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -53,6 +53,7 @@ class CustomConfigurationClass { val codecCallbackThreadPool: Int = 10 val codecRequestThreadPool: Int = 1 val codecUsePinAttributes: Boolean = true + val codecUseGrpc: Boolean = false val grpcWriterMessageBuffer: Int = 100 @@ -205,4 +206,7 @@ class Configuration(customConfiguration: CustomConfigurationClass) { val codecUsePinAttributes: Variable = Variable("codecUsePinAttributes", customConfiguration.codecUsePinAttributes.toString(), "true") -} + + val codecUseGrpc: Variable = + Variable("codecUseGrpc", customConfiguration.codecUseGrpc.toString(), "false") +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/internal/PipelineStepObject.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/internal/PipelineStepObject.kt index a15c74fd..e4278ddc 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/internal/PipelineStepObject.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/internal/PipelineStepObject.kt @@ -18,8 +18,8 @@ package com.exactpro.th2.rptdataprovider.entities.internal import com.exactpro.cradle.messages.StoredMessageId import com.exactpro.th2.rptdataprovider.entities.responses.MessageBatchWrapper -import com.exactpro.th2.rptdataprovider.services.rabbitmq.CodecBatchRequest -import com.exactpro.th2.rptdataprovider.services.rabbitmq.CodecBatchResponse +import com.exactpro.th2.rptdataprovider.services.CodecBatchRequest +import com.exactpro.th2.rptdataprovider.services.CodecBatchResponse import java.time.Instant data class StreamEndObject( diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageBatchConverter.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageBatchConverter.kt index 5ff4f7e1..703b35f9 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageBatchConverter.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageBatchConverter.kt @@ -6,11 +6,10 @@ import com.exactpro.th2.rptdataprovider.Context import com.exactpro.th2.rptdataprovider.entities.internal.PipelineCodecRequest import com.exactpro.th2.rptdataprovider.entities.internal.PipelineRawBatch import com.exactpro.th2.rptdataprovider.entities.requests.SseMessageSearchRequest -import com.exactpro.th2.rptdataprovider.entities.responses.MessageBatchWrapper import com.exactpro.th2.rptdataprovider.handlers.PipelineComponent import com.exactpro.th2.rptdataprovider.handlers.PipelineStatus import com.exactpro.th2.rptdataprovider.handlers.StreamName -import com.exactpro.th2.rptdataprovider.services.rabbitmq.CodecBatchRequest +import com.exactpro.th2.rptdataprovider.services.CodecBatchRequest import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.isActive import kotlinx.coroutines.launch @@ -131,4 +130,4 @@ class MessageBatchConverter( sendToChannel(pipelineMessage) } } -} +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageBatchDecoder.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageBatchDecoder.kt index 726232a8..0ca01331 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageBatchDecoder.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageBatchDecoder.kt @@ -9,7 +9,7 @@ import com.exactpro.th2.rptdataprovider.entities.requests.SseMessageSearchReques import com.exactpro.th2.rptdataprovider.handlers.PipelineComponent import com.exactpro.th2.rptdataprovider.handlers.PipelineStatus import com.exactpro.th2.rptdataprovider.handlers.StreamName -import com.exactpro.th2.rptdataprovider.services.rabbitmq.CodecBatchResponse +import com.exactpro.th2.rptdataprovider.services.CodecBatchResponse import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.isActive @@ -106,7 +106,7 @@ class MessageBatchDecoder( pipelineMessage.lastProcessedId, pipelineMessage.lastScannedTime, pipelineMessage.storedBatchWrapper, - context.rabbitMqService.sendToCodec(pipelineMessage.codecRequest), + context.decoderService.sendToCodec(pipelineMessage.codecRequest), protocol ) pipelineStatus.decodeEnd(streamName.toString(), pipelineMessage.codecRequest.protobufRawMessageBatch.groupsCount.toLong()) @@ -125,4 +125,4 @@ class MessageBatchDecoder( sendToChannel(pipelineMessage) } } -} +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/StreamMerger.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/StreamMerger.kt index 0ffd8490..565e2383 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/StreamMerger.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/StreamMerger.kt @@ -124,11 +124,11 @@ class StreamMerger( private fun getLastScannedObject(): PipelineStepObject? { return if (searchRequest.searchDirection == TimeRelation.AFTER) { messageStreams - .maxBy { it.currentElement?.lastScannedTime ?: Instant.MIN } + .maxByOrNull { it.currentElement?.lastScannedTime ?: Instant.MIN } ?.previousElement } else { messageStreams - .minBy { it.currentElement?.lastScannedTime ?: Instant.MIN } + .minByOrNull { it.currentElement?.lastScannedTime ?: Instant.MIN } ?.previousElement } } diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/producers/MessageProducer.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/producers/MessageProducer.kt index d58ac647..6736e5b8 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/producers/MessageProducer.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/producers/MessageProducer.kt @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2022 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,21 +20,18 @@ import com.exactpro.cradle.messages.StoredMessageId import com.exactpro.th2.common.grpc.* import com.exactpro.th2.rptdataprovider.entities.internal.BodyWrapper import com.exactpro.th2.rptdataprovider.entities.internal.Message -import com.exactpro.th2.rptdataprovider.handlers.StreamName import com.exactpro.th2.rptdataprovider.services.cradle.CradleMessageNotFoundException import com.exactpro.th2.rptdataprovider.services.cradle.CradleService -import com.exactpro.th2.rptdataprovider.services.rabbitmq.CodecBatchRequest -import com.exactpro.th2.rptdataprovider.services.rabbitmq.RabbitMqService +import com.exactpro.th2.rptdataprovider.services.CodecBatchRequest +import com.exactpro.th2.rptdataprovider.services.AbstractDecoderService class MessageProducer( private val cradle: CradleService, - private val rabbitMqService: RabbitMqService + private val rabbitMqService: AbstractDecoderService ) { - suspend fun fromId(id: StoredMessageId): Message { return cradle.getMessageSuspend(id)?.let { stored -> - val decoded = rabbitMqService.sendToCodec( CodecBatchRequest( MessageGroupBatch @@ -62,8 +59,6 @@ class MessageProducer( Message(stored, decoded, stored.content, setOf()) } - ?: throw CradleMessageNotFoundException("message '${id}' does not exist in cradle") } -} - +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/server/HttpServer.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/server/HttpServer.kt index b556d0d2..3c4d566c 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/server/HttpServer.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/server/HttpServer.kt @@ -31,10 +31,8 @@ import com.exactpro.th2.rptdataprovider.entities.sse.EventType import com.exactpro.th2.rptdataprovider.entities.sse.HttpWriter import com.exactpro.th2.rptdataprovider.entities.sse.SseEvent import com.exactpro.th2.rptdataprovider.entities.sse.StreamWriter -import com.exactpro.th2.rptdataprovider.grpc.RptDataProviderGrpcHandler import com.exactpro.th2.rptdataprovider.logMetrics import com.exactpro.th2.rptdataprovider.services.cradle.CradleObjectNotFoundException -import io.grpc.Status import io.ktor.application.* import io.ktor.features.* import io.ktor.http.* @@ -130,6 +128,7 @@ class HttpServer(private val applicationContext: Context) { @InternalAPI private suspend fun sendErrorCode(call: ApplicationCall, e: Exception, code: HttpStatusCode) { + logger.debug { "sendErrorCode(). Exception = $e, HttpStatusCode = ${code.value})" } withContext(NonCancellable) { call.respondText(e.rootCause?.message ?: e.toString(), ContentType.Text.Plain, code) } @@ -262,9 +261,12 @@ class HttpServer(private val applicationContext: Context) { checkContext(context) } cacheControl?.let { call.response.cacheControl(it) } + val responseText = jacksonMapper.asStringSuspend(calledFun.invoke()) + logger.trace { "RestAPI response: $responseText" } call.respondText( - jacksonMapper.asStringSuspend(calledFun.invoke()), - ContentType.Application.Json + responseText, + ContentType.Application.Json, + HttpStatusCode.OK ) coroutineContext.cancelChildren() }.join() diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/AbstractDecoderService.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/AbstractDecoderService.kt new file mode 100644 index 00000000..70cf96c2 --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/AbstractDecoderService.kt @@ -0,0 +1,140 @@ +/******************************************************************************* + * Copyright 2022 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ + +package com.exactpro.th2.rptdataprovider.services + +import com.exactpro.th2.common.grpc.MessageGroupBatch +import com.exactpro.th2.common.message.getSessionAliasAndDirection +import com.exactpro.th2.common.message.sequence +import com.exactpro.th2.common.message.sessionAlias +import com.exactpro.th2.rptdataprovider.entities.configuration.Configuration +import com.exactpro.th2.rptdataprovider.handlers.PipelineStatus +import kotlinx.coroutines.* +import mu.KotlinLogging +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.Executors + +abstract class AbstractDecoderService(configuration: Configuration) { + private val responseTimeout = configuration.codecResponseTimeout.value.toLong() + private val pendingRequests = ConcurrentHashMap() + private val usePinAttributes = configuration.codecUsePinAttributes.value.toBoolean() + private val maximumPendingRequests = configuration.codecPendingBatchLimit.value.toInt() + + private val requestSenderScope = CoroutineScope( + Executors.newFixedThreadPool(configuration.codecRequestThreadPool.value.toInt()).asCoroutineDispatcher() + ) + + private val callbackScope = CoroutineScope( + Executors.newFixedThreadPool(configuration.codecCallbackThreadPool.value.toInt()).asCoroutineDispatcher() + ) + + private val codecLatency = PipelineStatus.codecLatency + + protected fun handleResponse(decodedBatch: MessageGroupBatch) { + callbackScope.launch { + + val response = MessageGroupBatchWrapper(decodedBatch) + + LOGGER.trace { "codec response with hash ${response.responseHash} has been received" } + + pendingRequests.remove(response.requestId)?.let { + codecLatency.gaugeDec(listOf(it.streamName)) + codecLatency.setDuration(it.startTimestamp.toDouble(), listOf(it.streamName)) + it.completableDeferred.complete(response) + } + ?: LOGGER.trace { + val (firstSequence, lastSequence, stream) = getBatchProps(decodedBatch) + "codec response with hash ${response.responseHash} has no matching requests (stream=${stream} firstId=${firstSequence} lastId=${lastSequence} requestId=${response.requestId})" + } + } + } + + suspend fun sendToCodec(request: CodecBatchRequest): CodecBatchResponse { + return withContext(requestSenderScope.coroutineContext) { + while (pendingRequests.keys.size > maximumPendingRequests) { + delay(100) + } + + pendingRequests.computeIfAbsent(request.requestId) { + val pendingRequest = request.toPending() + + callbackScope.launch { + delay(responseTimeout) + + pendingRequest.completableDeferred.let { + if (it.isActive && + pendingRequests[request.requestId]?.completableDeferred == pendingRequest.completableDeferred + ) { + pendingRequests.remove(request.requestId) + it.complete(null) + + codecLatency.gaugeDec(listOf(request.streamName)) + codecLatency.setDuration( + pendingRequest.startTimestamp.toDouble(), + listOf(request.streamName) + ) + + LOGGER.warn { + val (firstSequence, lastSequence, stream) = getBatchProps(request.protobufRawMessageBatch) + "codec request timed out after $responseTimeout ms (stream=${stream} firstId=${firstSequence} lastId=${lastSequence} hash=${request.requestHash}) requestId=${request.requestId}" + } + } + } + } + + try { + if (usePinAttributes) { + val sessionAlias = request.protobufRawMessageBatch.groupsList[0] + .messagesList[0].rawMessage.sessionAlias + + codecLatency.gaugeInc(listOf(request.streamName)) + decode(request.protobufRawMessageBatch, sessionAlias) + } else { + decode(request.protobufRawMessageBatch) + } + + LOGGER.trace { + val (firstSequence, lastSequence, stream) = getBatchProps(request.protobufRawMessageBatch) + "codec request with hash ${request.requestHash} has been sent (stream=${stream} firstId=${firstSequence} lastId=${lastSequence} hash=${request.requestHash}) requestId=${request.requestId})" + } + LOGGER.debug { "codec request with hash ${request.requestHash.hashCode()} has been sent" } + + } catch (e: Exception) { + pendingRequest.completableDeferred.cancel("Unexpected exception while trying to send a codec request", e) + } + + pendingRequest + }.toResponse() + } + } + + protected abstract fun decode(messageGroupBatch: MessageGroupBatch) + protected abstract fun decode(messageGroupBatch: MessageGroupBatch, sessionAlias: String) + + private fun getBatchProps(batch: MessageGroupBatch): Triple { + val firstMessage = batch.groupsList.firstOrNull()?.messagesList?.firstOrNull() + + return Triple( + firstMessage?.sequence, + batch.groupsList.lastOrNull()?.messagesList?.lastOrNull()?.sequence, + firstMessage?.let { getSessionAliasAndDirection(firstMessage).joinToString(":") } + ) + } + + companion object { + private val LOGGER = KotlinLogging.logger { } + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/rabbitmq/CodecRequest.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/CodecRequest.kt similarity index 98% rename from src/main/kotlin/com/exactpro/th2/rptdataprovider/services/rabbitmq/CodecRequest.kt rename to src/main/kotlin/com/exactpro/th2/rptdataprovider/services/CodecRequest.kt index 5a7c56b9..0e5052f8 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/rabbitmq/CodecRequest.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/CodecRequest.kt @@ -14,7 +14,7 @@ * limitations under the License. ******************************************************************************/ -package com.exactpro.th2.rptdataprovider.services.rabbitmq +package com.exactpro.th2.rptdataprovider.services import com.exactpro.th2.common.grpc.Direction import com.exactpro.th2.common.grpc.MessageGroupBatch diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/grpc/GrpcService.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/grpc/GrpcService.kt new file mode 100644 index 00000000..cd8278c7 --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/grpc/GrpcService.kt @@ -0,0 +1,47 @@ +/******************************************************************************* + * Copyright 2022 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ + +package com.exactpro.th2.rptdataprovider.services.grpc + +import com.exactpro.th2.codec.grpc.AsyncCodecService +import com.exactpro.th2.common.grpc.MessageGroupBatch +import com.exactpro.th2.common.metrics.SESSION_ALIAS_LABEL +import com.exactpro.th2.common.schema.grpc.router.GrpcRouter +import com.exactpro.th2.rptdataprovider.entities.configuration.Configuration +import com.exactpro.th2.rptdataprovider.services.AbstractDecoderService +import io.grpc.stub.StreamObserver +import mu.KotlinLogging + +class GrpcService( + configuration: Configuration, + grpcRouter: GrpcRouter +) : AbstractDecoderService(configuration) { + + private val codecService = grpcRouter.getService(AsyncCodecService::class.java) + private val responseObserver = object : StreamObserver { + override fun onNext(decodedBatch: MessageGroupBatch) = handleResponse(decodedBatch) + override fun onError(t: Throwable?) = LOGGER.error(t) { "gRPC request failed." } + override fun onCompleted() {} + } + + override fun decode(messageGroupBatch: MessageGroupBatch) = codecService.decode(messageGroupBatch, responseObserver) + override fun decode(messageGroupBatch: MessageGroupBatch, sessionAlias: String) = + codecService.decode(messageGroupBatch, mapOf(SESSION_ALIAS_LABEL to sessionAlias), responseObserver) + + companion object { + private val LOGGER = KotlinLogging.logger { } + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/rabbitmq/RabbitMqService.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/rabbitmq/RabbitMqService.kt index 00770153..9443ddfa 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/rabbitmq/RabbitMqService.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/rabbitmq/RabbitMqService.kt @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2022 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,153 +17,21 @@ package com.exactpro.th2.rptdataprovider.services.rabbitmq import com.exactpro.th2.common.grpc.MessageGroupBatch -import com.exactpro.th2.common.message.direction -import com.exactpro.th2.common.message.sequence -import com.exactpro.th2.common.message.sessionAlias -import com.exactpro.th2.common.schema.message.MessageListener import com.exactpro.th2.common.schema.message.MessageRouter import com.exactpro.th2.rptdataprovider.entities.configuration.Configuration -import com.exactpro.th2.rptdataprovider.handlers.PipelineStatus -import kotlinx.coroutines.* -import mu.KotlinLogging -import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.Executors +import com.exactpro.th2.rptdataprovider.services.AbstractDecoderService class RabbitMqService( configuration: Configuration, messageRouterParsedBatch: MessageRouter, private val messageRouterRawBatch: MessageRouter -) { +) : AbstractDecoderService(configuration) { - companion object { - val logger = KotlinLogging.logger { } + init { + messageRouterParsedBatch.subscribeAll({ _, decodedBatch -> handleResponse(decodedBatch) }, "from_codec") } - private val responseTimeout = configuration.codecResponseTimeout.value.toLong() - private val pendingRequests = ConcurrentHashMap() - - private val usePinAttributes = configuration.codecUsePinAttributes.value.toBoolean() - - private val maximumPendingRequests = configuration.codecPendingBatchLimit.value.toInt() - - private val mqRequestSenderScope = CoroutineScope( - Executors.newFixedThreadPool(configuration.codecRequestThreadPool.value.toInt()).asCoroutineDispatcher() - ) - - private val mqCallbackScope = CoroutineScope( - Executors.newFixedThreadPool(configuration.codecCallbackThreadPool.value.toInt()).asCoroutineDispatcher() - ) - - private val codecLatency = PipelineStatus.codecLatency - - @Suppress("unused") - private val receiveChannel = mqCallbackScope.launch { - messageRouterParsedBatch.subscribeAll( - MessageListener { _, decodedBatch -> - mqCallbackScope.launch { - - val response = MessageGroupBatchWrapper(decodedBatch) - - logger.trace { "codec response with hash ${response.responseHash} has been received" } - - pendingRequests.remove(response.requestId)?.let { - codecLatency.gaugeDec(listOf(it.streamName)) - codecLatency.setDuration(it.startTimestamp.toDouble(), listOf(it.streamName)) - it.completableDeferred.complete(response) - } - ?: logger.trace { - val firstSequence = - decodedBatch.groupsList.firstOrNull()?.messagesList?.firstOrNull()?.sequence - val lastSequence = - decodedBatch.groupsList?.lastOrNull()?.messagesList?.lastOrNull()?.sequence - val stream = - "${decodedBatch.groupsList.firstOrNull()?.messagesList?.firstOrNull()?.message?.sessionAlias}:${decodedBatch.groupsList.firstOrNull()?.messagesList?.firstOrNull()?.message?.direction.toString()}" - "codec response with hash ${response.responseHash} has no matching requests (stream=${stream} firstId=${firstSequence} lastId=${lastSequence} requestId=${response.requestId})" - } - } - }, - - "from_codec" - ) - } - - suspend fun sendToCodec(request: CodecBatchRequest): CodecBatchResponse { - - return withContext(mqRequestSenderScope.coroutineContext) { - while (pendingRequests.keys.size > maximumPendingRequests) { - delay(100) - } - - pendingRequests.computeIfAbsent(request.requestId) { - val pendingRequest = request.toPending() - - mqCallbackScope.launch { - delay(responseTimeout) - - pendingRequest.completableDeferred.let { - if (it.isActive && - pendingRequests[request.requestId]?.completableDeferred == pendingRequest.completableDeferred - ) { - - pendingRequests.remove(request.requestId) - it.complete(null) - - codecLatency.gaugeDec(listOf(request.streamName)) - codecLatency.setDuration( - pendingRequest.startTimestamp.toDouble(), - listOf(request.streamName) - ) - - logger.warn { - val firstSequence = - request.protobufRawMessageBatch.groupsList.first()?.messagesList?.first()?.rawMessage?.sequence - val lastSequence = - request.protobufRawMessageBatch.groupsList.last()?.messagesList?.last()?.rawMessage?.sequence - val stream = - "${request.protobufRawMessageBatch.groupsList.first()?.messagesList?.first()?.rawMessage?.sessionAlias}:${request.protobufRawMessageBatch.groupsList.first()?.messagesList?.first()?.rawMessage?.direction.toString()}" - - "codec request timed out after $responseTimeout ms (stream=${stream} firstId=${firstSequence} lastId=${lastSequence} hash=${request.requestHash}) requestId=${request.requestId}" - } - } - } - } - - try { - - if (usePinAttributes) { - val sessionAlias = - request.protobufRawMessageBatch.groupsList - .first().messagesList - .first().rawMessage.metadata.id.connectionId.sessionAlias - - codecLatency.gaugeInc(listOf(request.streamName)) - - messageRouterRawBatch.sendAll(request.protobufRawMessageBatch, sessionAlias) - } else { - messageRouterRawBatch.sendAll(request.protobufRawMessageBatch) - } - - logger.trace { - val firstSequence = - request.protobufRawMessageBatch.groupsList.first()?.messagesList?.first()?.rawMessage?.sequence - val lastSequence = - request.protobufRawMessageBatch.groupsList.last()?.messagesList?.last()?.rawMessage?.sequence - val stream = - "${request.protobufRawMessageBatch.groupsList.first()?.messagesList?.first()?.rawMessage?.sessionAlias}:${request.protobufRawMessageBatch.groupsList.first()?.messagesList?.first()?.rawMessage?.direction.toString()}" - - - "codec request with hash ${request.requestHash} has been sent (stream=${stream} firstId=${firstSequence} lastId=${lastSequence} hash=${request.requestHash}) requestId=${request.requestId})" - } - logger.debug { "codec request with hash ${request.requestHash.hashCode()} has been sent" } - - } catch (e: Exception) { - pendingRequest.completableDeferred.cancel( - "Unexpected exception while trying to send a codec request", e - ) - } - - pendingRequest - }.toResponse() - } - } + override fun decode(messageGroupBatch: MessageGroupBatch) = messageRouterRawBatch.sendAll(messageGroupBatch) + override fun decode(messageGroupBatch: MessageGroupBatch, sessionAlias: String) = + messageRouterRawBatch.sendAll(messageGroupBatch, sessionAlias) } \ No newline at end of file