Skip to content
Draft
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

plugins {
id 'java'
id 'org.jetbrains.kotlin.jvm' version '1.3.72'
id 'org.jetbrains.kotlin.jvm' version '1.5.32'
id 'application'
id 'com.palantir.docker' version '0.25.0'
}
Expand Down Expand Up @@ -46,14 +46,15 @@ dependencies {
implementation 'org.slf4j:slf4j-api'
implementation 'org.slf4j:slf4j-log4j12'

implementation 'com.exactpro.th2:common:3.31.0'
implementation 'com.exactpro.th2:common:3.34.0-TH2-3340-1995979126-SNAPSHOT'

implementation group: 'net.jpountz.lz4', name: 'lz4', version: '1.3.0'

implementation 'com.exactpro.th2:cradle-core:2.20.2'
implementation 'com.exactpro.th2:cradle-cassandra:2.20.2'

implementation 'com.exactpro.th2:grpc-data-provider:0.1.4'
implementation 'com.exactpro.th2:grpc-codec:0.0.2'

implementation 'io.github.microutils:kotlin-logging:1.7.10'

Expand Down
35 changes: 22 additions & 13 deletions src/main/kotlin/com/exactpro/th2/rptdataprovider/Context.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright 2020-2021 Exactpro (Exactpro Systems Limited)
* Copyright 2020-2022 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,13 +16,10 @@

package com.exactpro.th2.rptdataprovider


import com.exactpro.cradle.CradleManager
import com.exactpro.th2.common.grpc.MessageBatch
import com.exactpro.th2.common.grpc.MessageGroup
import com.exactpro.th2.common.grpc.MessageGroupBatch
import com.exactpro.th2.common.grpc.RawMessageBatch
import com.exactpro.th2.common.schema.grpc.configuration.GrpcConfiguration
import com.exactpro.th2.common.schema.grpc.router.GrpcRouter
import com.exactpro.th2.common.schema.message.MessageRouter
import com.exactpro.th2.rptdataprovider.cache.EventCache
import com.exactpro.th2.rptdataprovider.cache.MessageCache
Expand All @@ -44,12 +41,14 @@ import com.exactpro.th2.rptdataprovider.handlers.SearchMessagesHandler
import com.exactpro.th2.rptdataprovider.producers.EventProducer
import com.exactpro.th2.rptdataprovider.producers.MessageProducer
import com.exactpro.th2.rptdataprovider.server.ServerType
import com.exactpro.th2.rptdataprovider.services.AbstractDecoderService
import com.exactpro.th2.rptdataprovider.services.cradle.CradleService
import com.exactpro.th2.rptdataprovider.services.grpc.GrpcService
import com.exactpro.th2.rptdataprovider.services.rabbitmq.RabbitMqService
import com.fasterxml.jackson.databind.DeserializationFeature
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import io.ktor.http.*
import io.ktor.http.CacheControl

@Suppress("MemberVisibilityCanBePrivate")
class Context(
Expand All @@ -68,26 +67,36 @@ class Context(
val messageRouterRawBatch: MessageRouter<MessageGroupBatch>,

val messageRouterParsedBatch: MessageRouter<MessageGroupBatch>,

val grpcRouter: GrpcRouter,

val grpcConfig: GrpcConfiguration,

val cradleService: CradleService = CradleService(
configuration,
cradleManager
),

val rabbitMqService: RabbitMqService = RabbitMqService(
configuration,
messageRouterParsedBatch,
messageRouterRawBatch
),
val decoderService: AbstractDecoderService = if (configuration.codecUseGrpc.value.toBoolean()) {
GrpcService(
configuration,
grpcRouter
)
} else {
RabbitMqService(
configuration,
messageRouterParsedBatch,
messageRouterRawBatch
)
},

val eventProducer: EventProducer = EventProducer(cradleService, jacksonMapper),

val eventCache: EventCache = EventCache(cacheTimeout, configuration.eventCacheSize.value.toLong(), eventProducer),

val messageProducer: MessageProducer = MessageProducer(
cradleService,
rabbitMqService
decoderService
),

val messageCache: MessageCache = MessageCache(configuration, messageProducer),
Expand Down Expand Up @@ -145,4 +154,4 @@ class Context(
}
}
}
}
}
5 changes: 3 additions & 2 deletions src/main/kotlin/com/exactpro/th2/rptdataprovider/Main.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright 2020-2021 Exactpro (Exactpro Systems Limited)
* Copyright 2020-2022 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -77,6 +77,7 @@ class Main {
messageRouterParsedBatch = configurationFactory.messageRouterMessageGroupBatch.also {
resources += it
},
grpcRouter = configurationFactory.grpcRouter,
grpcConfig = configurationFactory.grpcConfiguration
)
}
Expand Down Expand Up @@ -174,4 +175,4 @@ fun main(args: Array<String>) {
logger.error(ex) { "Cannot start the box" }
exitProcess(1)
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright 2020-2021 Exactpro (Exactpro Systems Limited)
* Copyright 2020-2022 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -53,6 +53,7 @@ class CustomConfigurationClass {
val codecCallbackThreadPool: Int = 10
val codecRequestThreadPool: Int = 1
val codecUsePinAttributes: Boolean = true
val codecUseGrpc: Boolean = false

val grpcWriterMessageBuffer: Int = 100

Expand Down Expand Up @@ -205,4 +206,7 @@ class Configuration(customConfiguration: CustomConfigurationClass) {

val codecUsePinAttributes: Variable =
Variable("codecUsePinAttributes", customConfiguration.codecUsePinAttributes.toString(), "true")
}

val codecUseGrpc: Variable =
Variable("codecUseGrpc", customConfiguration.codecUseGrpc.toString(), "false")
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ package com.exactpro.th2.rptdataprovider.entities.internal

import com.exactpro.cradle.messages.StoredMessageId
import com.exactpro.th2.rptdataprovider.entities.responses.MessageBatchWrapper
import com.exactpro.th2.rptdataprovider.services.rabbitmq.CodecBatchRequest
import com.exactpro.th2.rptdataprovider.services.rabbitmq.CodecBatchResponse
import com.exactpro.th2.rptdataprovider.services.CodecBatchRequest
import com.exactpro.th2.rptdataprovider.services.CodecBatchResponse
import java.time.Instant

data class StreamEndObject(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@ import com.exactpro.th2.rptdataprovider.Context
import com.exactpro.th2.rptdataprovider.entities.internal.PipelineCodecRequest
import com.exactpro.th2.rptdataprovider.entities.internal.PipelineRawBatch
import com.exactpro.th2.rptdataprovider.entities.requests.SseMessageSearchRequest
import com.exactpro.th2.rptdataprovider.entities.responses.MessageBatchWrapper
import com.exactpro.th2.rptdataprovider.handlers.PipelineComponent
import com.exactpro.th2.rptdataprovider.handlers.PipelineStatus
import com.exactpro.th2.rptdataprovider.handlers.StreamName
import com.exactpro.th2.rptdataprovider.services.rabbitmq.CodecBatchRequest
import com.exactpro.th2.rptdataprovider.services.CodecBatchRequest
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
Expand Down Expand Up @@ -131,4 +130,4 @@ class MessageBatchConverter(
sendToChannel(pipelineMessage)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import com.exactpro.th2.rptdataprovider.entities.requests.SseMessageSearchReques
import com.exactpro.th2.rptdataprovider.handlers.PipelineComponent
import com.exactpro.th2.rptdataprovider.handlers.PipelineStatus
import com.exactpro.th2.rptdataprovider.handlers.StreamName
import com.exactpro.th2.rptdataprovider.services.rabbitmq.CodecBatchResponse
import com.exactpro.th2.rptdataprovider.services.CodecBatchResponse
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.isActive
Expand Down Expand Up @@ -106,7 +106,7 @@ class MessageBatchDecoder(
pipelineMessage.lastProcessedId,
pipelineMessage.lastScannedTime,
pipelineMessage.storedBatchWrapper,
context.rabbitMqService.sendToCodec(pipelineMessage.codecRequest),
context.decoderService.sendToCodec(pipelineMessage.codecRequest),
protocol
)
pipelineStatus.decodeEnd(streamName.toString(), pipelineMessage.codecRequest.protobufRawMessageBatch.groupsCount.toLong())
Expand All @@ -125,4 +125,4 @@ class MessageBatchDecoder(
sendToChannel(pipelineMessage)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,11 @@ class StreamMerger(
private fun getLastScannedObject(): PipelineStepObject? {
return if (searchRequest.searchDirection == TimeRelation.AFTER) {
messageStreams
.maxBy { it.currentElement?.lastScannedTime ?: Instant.MIN }
.maxByOrNull { it.currentElement?.lastScannedTime ?: Instant.MIN }
?.previousElement
} else {
messageStreams
.minBy { it.currentElement?.lastScannedTime ?: Instant.MIN }
.minByOrNull { it.currentElement?.lastScannedTime ?: Instant.MIN }
?.previousElement
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright 2020-2021 Exactpro (Exactpro Systems Limited)
* Copyright 2020-2022 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,17 +20,15 @@ import com.exactpro.cradle.messages.StoredMessageId
import com.exactpro.th2.common.grpc.*
import com.exactpro.th2.rptdataprovider.entities.internal.BodyWrapper
import com.exactpro.th2.rptdataprovider.entities.internal.Message
import com.exactpro.th2.rptdataprovider.handlers.StreamName
import com.exactpro.th2.rptdataprovider.services.cradle.CradleMessageNotFoundException
import com.exactpro.th2.rptdataprovider.services.cradle.CradleService
import com.exactpro.th2.rptdataprovider.services.rabbitmq.CodecBatchRequest
import com.exactpro.th2.rptdataprovider.services.rabbitmq.RabbitMqService
import com.exactpro.th2.rptdataprovider.services.CodecBatchRequest
import com.exactpro.th2.rptdataprovider.services.AbstractDecoderService

class MessageProducer(
private val cradle: CradleService,
private val rabbitMqService: RabbitMqService
private val rabbitMqService: AbstractDecoderService
) {

suspend fun fromId(id: StoredMessageId): Message {

return cradle.getMessageSuspend(id)?.let { stored ->
Expand Down Expand Up @@ -62,8 +60,6 @@ class MessageProducer(

Message(stored, decoded, stored.content, setOf())
}

?: throw CradleMessageNotFoundException("message '${id}' does not exist in cradle")
}
}

}
Loading