From 13e962732a921301384a6b0fdf9e3a286566938c Mon Sep 17 00:00:00 2001 From: "nikita.smirnov" Date: Fri, 27 Jun 2025 10:44:41 +0400 Subject: [PATCH 1/7] Provided ability to ignore `LimitForParent` sse event request argument using `ignoreLimitForParent` option --- README.md | 80 +++++++++-------- gradle.properties | 2 +- .../entities/configuration/Configuration.kt | 85 +++++++++---------- .../entities/configuration/Variable.kt | 13 +-- .../handlers/IParentEventCounter.kt | 9 +- .../handlers/SearchEventsHandler.kt | 20 +---- .../handlers/IParentEventCounterTest.kt | 4 +- 7 files changed, 103 insertions(+), 110 deletions(-) diff --git a/README.md b/README.md index 5eda606f..52d3cf38 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# Report data provider (5.16.0) +# Report data provider (5.17.0) # Overview This component serves as a backend for rpt-viewer. It will connect to the cassandra database via cradle api and expose the data stored in there as REST resources. @@ -185,70 +185,71 @@ id: event / message id | null | null | null # Configuration schema component description example (rpt-data-provider.yml): -``` +```yaml apiVersion: th2.exactpro.com/v2 kind: Th2CoreBox metadata: name: rpt-data-provider spec: imageName: ghcr.io/th2-net/th2-rpt-data-provider - imageVersion: 5.7.0 // change this line if you want to use a newer version + imageVersion: 5.17.0 # change this line if you want to use a newer version type: th2-rpt-data-provider customConfig: hostname: localhost port: 8080 - responseTimeout: 60000 // maximum request processing time in milliseconds + responseTimeout: 60000 # maximum request processing time in milliseconds - eventCacheSize: 1000 // internal event cache size - messageCacheSize: 1000 // internal message cache size - serverCacheTimeout: 60000 // cached event lifetime in milliseconds + eventCacheSize: 1000 # internal event cache size + messageCacheSize: 1000 # internal message cache size + serverCacheTimeout: 60000 # cached event lifetime in milliseconds - ioDispatcherThreadPoolSize: 10 // thread pool size for blocking database calls - codecResponseTimeout: 6000 // if a codec doesn't respond in time, requested message is returned with a 'null' body - checkRequestsAliveDelay: 2000 // response channel check interval in milliseconds + ioDispatcherThreadPoolSize: 10 # thread pool size for blocking database calls + codecResponseTimeout: 6000 # if a codec doesn't respond in time, requested message is returned with a 'null' body + checkRequestsAliveDelay: 2000 # response channel check interval in milliseconds - enableCaching: true // enables proxy and client cache (Cache-control response headers) - notModifiedObjectsLifetime: 3600 // max-age in seconds - rarelyModifiedObjects: 500 // max-age in seconds + enableCaching: true # enables proxy and client cache (Cache-control response headers) + notModifiedObjectsLifetime: 3600 # max-age in seconds + rarelyModifiedObjects: 500 # max-age in seconds - sseEventSearchStep: 200 // step size in seconds when requesting events - keepAliveTimeout: 5000 // timeout in milliseconds. keep_alive sending frequency - cradleDispatcherPoolSize: 1 // number of threads in the cradle dispatcher + sseEventSearchStep: 200 # step size in seconds when requesting events + keepAliveTimeout: 5000 # timeout in milliseconds. keep_alive sending frequency + ignoreLimitForParent: false # ignore LimitForParent sse event request argument + cradleDispatcherPoolSize: 1 # number of threads in the cradle dispatcher - messageExtractorOutputBatchBuffer: 1 // buffer size of message search pipeline + messageExtractorOutputBatchBuffer: 1 # buffer size of message search pipeline messageConverterOutputBatchBuffer: 1 messageDecoderOutputBatchBuffer: 1 messageUnpackerOutputMessageBuffer: 100 messageFilterOutputMessageBuffer: 100 messageMergerOutputMessageBuffer: 10 - messageIdsLookupLimitDays: 7 // lookup limit value for seacing next and previous message ids. + messageIdsLookupLimitDays: 7 # lookup limit value for seacing next and previous message ids. - codecPendingBatchLimit: 16 // the total number of messages sent to the codec batches in parallel for all pipelines - codecCallbackThreadPool: 4 // thread pool for parsing messages received from codecs - codecRequestThreadPool: 1 // thread pool for sending message to codecs - grpcWriterMessageBuffer: 10 // buffer before send grpc response + codecPendingBatchLimit: 16 # the total number of messages sent to the codec batches in parallel for all pipelines + codecCallbackThreadPool: 4 # thread pool for parsing messages received from codecs + codecRequestThreadPool: 1 # thread pool for sending message to codecs + grpcWriterMessageBuffer: 10 # buffer before send grpc response - sendEmptyDelay: 100 // frequency of sending empty messages + sendEmptyDelay: 100 # frequency of sending empty messages - eventSearchChunkSize: 64 // the size of event chunks during sse search and the maximum size of the batch of messages upon request getEvents + eventSearchChunkSize: 64 # the size of event chunks during sse search and the maximum size of the batch of messages upon request getEvents - grpcThreadPoolSize: 20 // thread pool size for grpc requests + grpcThreadPoolSize: 20 # thread pool size for grpc requests - useStrictMode: false // if true throw an exception when bad messages are received from the codec otherwise return messages with null body and type + useStrictMode: false # if true throw an exception when bad messages are received from the codec otherwise return messages with null body and type - serverType: HTTP // provider server type. Allows 'HTTP' and 'GRPC' (case sensetive). + serverType: HTTP # provider server type. Allows 'HTTP' and 'GRPC' (case sensetive). - codecUsePinAttributes: true // send raw message to specified codec (true) or send to all codecs (false) + codecUsePinAttributes: true # send raw message to specified codec (true) or send to all codecs (false) - eventSearchTimeOffset: 5000 // sets the offset in milliseconds on search events. (startTimestamp - eventSearchTimeOffset) and (endTimestamp + eventSearchTimeOffset) + eventSearchTimeOffset: 5000 # sets the offset in milliseconds on search events. (startTimestamp - eventSearchTimeOffset) and (endTimestamp + eventSearchTimeOffset) - searchBySessionGroup: true // if true data-provider uses the session alias to group cache and translates http / gRPC requests by session alias to group th2 storage request - aliasToGroupCacheSize: 1000 // the size of cache for the mapping between session alias and group. + searchBySessionGroup: true # if true data-provider uses the session alias to group cache and translates http / gRPC requests by session alias to group th2 storage request + aliasToGroupCacheSize: 1000 # the size of cache for the mapping between session alias and group. - useTransportMode: true // if true data-provider uses th2 transport protocol to interact with thw codecs + useTransportMode: true # if true data-provider uses th2 transport protocol to interact with thw codecs - pins: // pins are used to communicate with codec components to parse message data + pins: # pins are used to communicate with codec components to parse message data mq: subscribers: - name: from_codec @@ -276,8 +277,10 @@ spec: ref: schema-stable path: custom-component service: - enabled: false - nodePort: '31275' + enabled: true + clusterIP: + - name: http + containerPort: 8080 envVariables: JAVA_TOOL_OPTIONS: > -XX:+ExitOnOutOfMemoryError @@ -296,6 +299,13 @@ spec: # Release notes +## 5.17.0 +* Provided ability to ignore `LimitForParent` sse event request argument using `ignoreLimitForParent` option +* Updated: + * cradle API: `5.6.0-dev` + * kotlin: `2.2.0` + * kotlin-logging: `7.0.7` + ## 5.16.0 * Migrated to ktor: `3.1.2` * Updated: diff --git a/gradle.properties b/gradle.properties index 972a8cbc..eb79f062 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,3 @@ kotlin.code.style=official -release_version=5.16.0 +release_version=5.17.0 docker_image_name= \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/configuration/Configuration.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/configuration/Configuration.kt index 0d2030b4..c5e24a8c 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/configuration/Configuration.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/configuration/Configuration.kt @@ -1,5 +1,5 @@ /* - * Copyright 2020-2024 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2025 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -41,6 +41,7 @@ class CustomConfigurationClass { val sseEventSearchStep: Long = 200 val keepAliveTimeout: Long = 5_000 + val ignoreLimitForParent: Boolean = false val messageExtractorOutputBatchBuffer: Int = 1 val messageConverterOutputBatchBuffer: Int = 1 @@ -84,142 +85,138 @@ class Configuration(customConfiguration: CustomConfigurationClass) { } val hostname: Variable = - Variable("hostname", customConfiguration.hostname, "localhost") + Variable("hostname", customConfiguration.hostname) val port: Variable = - Variable("port", customConfiguration.port.toString(), "8080") + Variable("port", customConfiguration.port.toString()) val responseTimeout: Variable = - Variable("responseTimeout", customConfiguration.responseTimeout.toString(), "60000") + Variable("responseTimeout", customConfiguration.responseTimeout.toString()) val serverCacheTimeout: Variable = - Variable("serverCacheTimeout", customConfiguration.serverCacheTimeout.toString(), "60000") + Variable("serverCacheTimeout", customConfiguration.serverCacheTimeout.toString()) val eventCacheSize: Variable = - Variable("eventCacheSize", customConfiguration.eventCacheSize.toString(), "1") + Variable("eventCacheSize", customConfiguration.eventCacheSize.toString()) val messageCacheSize: Variable = - Variable("messageCacheSize", customConfiguration.messageCacheSize.toString(), "1") + Variable("messageCacheSize", customConfiguration.messageCacheSize.toString()) val aliasToGroupCacheSize: Variable = // TODO: added value check - Variable("aliasToGroupCacheSize", customConfiguration.aliasToGroupCacheSize.toString(), "1000") + Variable("aliasToGroupCacheSize", customConfiguration.aliasToGroupCacheSize.toString()) val useTransportMode: Variable = - Variable("useTransportMode", customConfiguration.useTransportMode.toString(), "true") + Variable("useTransportMode", customConfiguration.useTransportMode.toString()) val ioDispatcherThreadPoolSize: Variable = Variable("ioDispatcherThreadPoolSize", customConfiguration.ioDispatcherThreadPoolSize.let { if (it < 10) logger.warn { "The optimal value of the ioDispatcherThreadPoolSize is 10. Current: $it" } it.toString() - }, "10") + }) val checkRequestsAliveDelay: Variable = - Variable("checkRequestsAliveDelay", customConfiguration.checkRequestsAliveDelay.toString(), "2000") + Variable("checkRequestsAliveDelay", customConfiguration.checkRequestsAliveDelay.toString()) - val enableCaching: Variable = Variable("enableCaching", customConfiguration.enableCaching.toString(), "true") + val enableCaching: Variable = Variable("enableCaching", customConfiguration.enableCaching.toString()) val notModifiedObjectsLifetime: Variable = - Variable("notModifiedObjectsLifetime", customConfiguration.notModifiedObjectsLifetime.toString(), "3600") + Variable("notModifiedObjectsLifetime", customConfiguration.notModifiedObjectsLifetime.toString()) val rarelyModifiedObjects: Variable = - Variable("rarelyModifiedObjects", customConfiguration.rarelyModifiedObjects.toString(), "500") + Variable("rarelyModifiedObjects", customConfiguration.rarelyModifiedObjects.toString()) val sseEventSearchStep: Variable = - Variable("sseEventSearchStep", customConfiguration.sseEventSearchStep.toString(), "200") + Variable("sseEventSearchStep", customConfiguration.sseEventSearchStep.toString()) val keepAliveTimeout: Variable = - Variable("keepAliveTimeout", customConfiguration.keepAliveTimeout.toString(), "5000") + Variable("keepAliveTimeout", customConfiguration.keepAliveTimeout.toString()) + + val ignoreLimitForParent: Variable = + Variable("ignoreLimitForParent", customConfiguration.ignoreLimitForParent.toString()) val messageExtractorOutputBatchBuffer: Variable = Variable( "messageExtractorOutputBatchBuffer", - customConfiguration.messageExtractorOutputBatchBuffer.toString(), - "2" + customConfiguration.messageExtractorOutputBatchBuffer.toString() ) val messageConverterOutputBatchBuffer: Variable = Variable( "messageConverterOutputBatchBuffer", - customConfiguration.messageConverterOutputBatchBuffer.toString(), - "2" + customConfiguration.messageConverterOutputBatchBuffer.toString() ) val messageDecoderOutputBatchBuffer: Variable = Variable( "messageDecoderOutputBatchBuffer", - customConfiguration.messageDecoderOutputBatchBuffer.toString(), - "2" + customConfiguration.messageDecoderOutputBatchBuffer.toString() ) val messageUnpackerOutputMessageBuffer: Variable = Variable( "messageUnpackerOutputMessageBuffer", - customConfiguration.messageUnpackerOutputMessageBuffer.toString(), - "1000" + customConfiguration.messageUnpackerOutputMessageBuffer.toString() ) val messageFilterOutputMessageBuffer: Variable = Variable( "messageFilterOutputMessageBuffer", - customConfiguration.messageFilterOutputMessageBuffer.toString(), - "1000" + customConfiguration.messageFilterOutputMessageBuffer.toString() ) val messageMergerOutputMessageBuffer: Variable = Variable( "messageMergerOutputMessageBuffer", - customConfiguration.messageMergerOutputMessageBuffer.toString(), - "10" + customConfiguration.messageMergerOutputMessageBuffer.toString() ) val messageIdsLookupLimitDays: Variable = Variable( "messageIdsLookupLimitDays", - customConfiguration.messageIdsLookupLimitDays.toString(), - "7" + customConfiguration.messageIdsLookupLimitDays.toString() ) val codecResponseTimeout: Variable = Variable( "codecResponseTimeout", - customConfiguration.codecResponseTimeout.toString(), "6000" + customConfiguration.codecResponseTimeout.toString() ) val codecPendingBatchLimit: Variable = - Variable("codecPendingBatchLimit", customConfiguration.codecPendingBatchLimit.toString(), "200") + Variable("codecPendingBatchLimit", customConfiguration.codecPendingBatchLimit.toString()) val codecCallbackThreadPool: Variable = - Variable("codecCallbackThreadPool", customConfiguration.codecCallbackThreadPool.toString(), "10") + Variable("codecCallbackThreadPool", customConfiguration.codecCallbackThreadPool.toString()) val codecRequestThreadPool: Variable = - Variable("codecRequestThreadPool", customConfiguration.codecRequestThreadPool.toString(), "1") + Variable("codecRequestThreadPool", customConfiguration.codecRequestThreadPool.toString()) val grpcWriterMessageBuffer: Variable = - Variable("grpcWriterMessageBuffer", customConfiguration.grpcWriterMessageBuffer.toString(), "100") + Variable("grpcWriterMessageBuffer", customConfiguration.grpcWriterMessageBuffer.toString()) val cradleDispatcherPoolSize: Variable = - Variable("cradleDispatcherPoolSize", customConfiguration.cradleDispatcherPoolSize.toString(), "1") + Variable("cradleDispatcherPoolSize", customConfiguration.cradleDispatcherPoolSize.toString()) val sendEmptyDelay: Variable = - Variable("sendEmptyDelay", customConfiguration.sendEmptyDelay.toString(), "100") + Variable("sendEmptyDelay", customConfiguration.sendEmptyDelay.toString()) val eventSearchChunkSize: Variable = - Variable("eventSearchChunkSize", customConfiguration.eventSearchChunkSize.toString(), "64") + Variable("eventSearchChunkSize", customConfiguration.eventSearchChunkSize.toString()) val useStrictMode: Variable = - Variable("useStrictMode", customConfiguration.useStrictMode.toString(), "false") + Variable("useStrictMode", customConfiguration.useStrictMode.toString()) val serverType: Variable = - Variable("serverType", customConfiguration.serverType.toString(), "HTTP") + Variable("serverType", customConfiguration.serverType.toString()) val codecUsePinAttributes: Variable = - Variable("codecUsePinAttributes", customConfiguration.codecUsePinAttributes.toString(), "true") + Variable("codecUsePinAttributes", customConfiguration.codecUsePinAttributes.toString()) val grpcThreadPoolSize: Variable = - Variable("grpcThreadPoolSize", customConfiguration.grpcThreadPoolSize.toString(), "20") + Variable("grpcThreadPoolSize", customConfiguration.grpcThreadPoolSize.toString()) val eventSearchTimeOffset: Variable = - Variable("eventSearchTimeOffset", customConfiguration.eventSearchTimeOffset.toString(), "5000") + Variable("eventSearchTimeOffset", customConfiguration.eventSearchTimeOffset.toString()) val eventSearchGap: Variable = - Variable("eventSearchGap", customConfiguration.eventSearchGap.toString(), "60") + Variable("eventSearchGap", customConfiguration.eventSearchGap.toString()) } diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/configuration/Variable.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/configuration/Variable.kt index b7b62f7a..f9ba7406 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/configuration/Variable.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/configuration/Variable.kt @@ -1,5 +1,5 @@ /* - * Copyright 2020-2024 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2025 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,8 +21,7 @@ import io.github.oshai.kotlinlogging.KotlinLogging class Variable( name: String, - param: String?, - defaultValue: String, + param: String, showInLog: Boolean = true ) { private val logger = KotlinLogging.logger { } @@ -30,13 +29,9 @@ class Variable( val value: String = param .also { logger.info { - val valueToLog = if (showInLog) it ?: defaultValue else "*****" + val valueToLog = if (showInLog) it else "*****" - if (it == null) - "property '$name' is not set - defaulting to '$valueToLog'" - else - "property '$name' is set to '$valueToLog'" + "property '$name' is set to '$valueToLog'" } } - ?: defaultValue } \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/IParentEventCounter.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/IParentEventCounter.kt index 34f32461..8ccf9dfb 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/IParentEventCounter.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/IParentEventCounter.kt @@ -1,5 +1,5 @@ /* - * Copyright 2024 Exactpro (Exactpro Systems Limited) + * Copyright 2024-2025 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -67,7 +67,10 @@ internal interface IParentEventCounter { companion object { private val MAX_EVENT_COUNTER = AtomicLong(Long.MAX_VALUE) - fun create(limitForParent: Long? = null): IParentEventCounter = - limitForParent?.let { LimitedParentEventCounter(it) } ?: NoLimitedParentEventCounter + fun create(limitForParent: Long? = null, ignoreLimitForParent: Boolean = false): IParentEventCounter = if (ignoreLimitForParent || limitForParent == null) { + NoLimitedParentEventCounter + } else { + LimitedParentEventCounter(limitForParent) + } } } \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/SearchEventsHandler.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/SearchEventsHandler.kt index ba2a86b0..a952be29 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/SearchEventsHandler.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/SearchEventsHandler.kt @@ -1,5 +1,5 @@ /* - * Copyright 2020-2024 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2025 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -83,6 +83,7 @@ class SearchEventsHandler(context: Context<*, *, *, *>) { private val sseEventSearchStep: Long = context.configuration.sseEventSearchStep.value.toLong() private val eventSearchChunkSize: Int = context.configuration.eventSearchChunkSize.value.toInt() private val keepAliveTimeout: Long = context.configuration.keepAliveTimeout.value.toLong() + private val ignoreLimitForParent: Boolean = context.configuration.ignoreLimitForParent.value.toBoolean() private suspend fun keepAlive( writer: StreamWriter<*, *>, @@ -244,21 +245,8 @@ class SearchEventsHandler(context: Context<*, *, *, *>) { } } - private suspend fun dropByTimestampFilter( - request: SseEventSearchRequest, resumeFromTimestamp: Instant - ): (BaseEventEntity) -> Boolean { - return { event: BaseEventEntity -> - if (request.searchDirection == AFTER) { - event.startTimestamp.isBeforeOrEqual(resumeFromTimestamp) - } else { - event.startTimestamp.isAfterOrEqual(resumeFromTimestamp) - } - } - } - - @ExperimentalCoroutinesApi - private suspend fun dropBeforeResumeId( + private fun dropBeforeResumeId( eventFlow: Flow, resumeFromId: ProviderEventId, ): Flow { @@ -294,7 +282,7 @@ class SearchEventsHandler(context: Context<*, *, *, *>) { requireNotNull(resumeTimestamp) { "timestamp for $resumeProviderId cannot be extracted" } } val timeIntervals = getTimeIntervals(request, sseEventSearchStep, startTimestamp) - val parentEventCounter = IParentEventCounter.create(request.limitForParent) + val parentEventCounter = IParentEventCounter.create(request.limitForParent, ignoreLimitForParent) flow { for ((start, end) in timeIntervals) { diff --git a/src/test/kotlin/handlers/IParentEventCounterTest.kt b/src/test/kotlin/handlers/IParentEventCounterTest.kt index 8361cec0..91b0c911 100644 --- a/src/test/kotlin/handlers/IParentEventCounterTest.kt +++ b/src/test/kotlin/handlers/IParentEventCounterTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2024 Exactpro (Exactpro Systems Limited) + * Copyright 2024-2025 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,7 +31,7 @@ class IParentEventCounterTest { @Test fun `no limit test`() { - val eventCounter = IParentEventCounter.create(null) + val eventCounter = IParentEventCounter.create() val rootEventId = NEXT_UUID val parentEventId = ProviderEventId( From 1734d250304cb818870f00c93d081068db09137b Mon Sep 17 00:00:00 2001 From: "nikita.smirnov" Date: Mon, 30 Jun 2025 11:08:11 +0400 Subject: [PATCH 2/7] Crated HashParentEventCounter --- .../entities/configuration/Configuration.kt | 4 +- .../handlers/IParentEventCounter.kt | 67 ++++++++++++++----- .../handlers/SearchEventsHandler.kt | 2 +- .../handlers/IParentEventCounterTest.kt | 23 ++++--- 4 files changed, 69 insertions(+), 27 deletions(-) diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/configuration/Configuration.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/configuration/Configuration.kt index c5e24a8c..9347795f 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/configuration/Configuration.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/configuration/Configuration.kt @@ -41,7 +41,7 @@ class CustomConfigurationClass { val sseEventSearchStep: Long = 200 val keepAliveTimeout: Long = 5_000 - val ignoreLimitForParent: Boolean = false + val ignoreLimitForParent: String = "none" val messageExtractorOutputBatchBuffer: Int = 1 val messageConverterOutputBatchBuffer: Int = 1 @@ -133,7 +133,7 @@ class Configuration(customConfiguration: CustomConfigurationClass) { Variable("keepAliveTimeout", customConfiguration.keepAliveTimeout.toString()) val ignoreLimitForParent: Variable = - Variable("ignoreLimitForParent", customConfiguration.ignoreLimitForParent.toString()) + Variable("ignoreLimitForParent", customConfiguration.ignoreLimitForParent) val messageExtractorOutputBatchBuffer: Variable = Variable( diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/IParentEventCounter.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/IParentEventCounter.kt index 8ccf9dfb..9e7bdbc8 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/IParentEventCounter.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/IParentEventCounter.kt @@ -17,8 +17,8 @@ package com.exactpro.th2.rptdataprovider.handlers import com.exactpro.th2.rptdataprovider.entities.responses.BaseEventEntity +import java.security.MessageDigest import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.atomic.AtomicLong internal interface IParentEventCounter { /** @@ -35,42 +35,79 @@ internal interface IParentEventCounter { private class LimitedParentEventCounter( private val limitForParent: Long ) : IParentEventCounter { - private val parentEventCounter = ConcurrentHashMap() + private val parentEventCounter = ConcurrentHashMap() override fun checkCountAndGet(event: BaseEventEntity): Boolean { if (event.parentEventId == null) { return true } - val value = parentEventCounter.compute(event.parentEventId.eventId.id) { _, value -> + return parentEventCounter.compute(event.parentEventId.eventId.id) { _, value -> if (value == null) { - AtomicLong(1) + 1L } else { - if (value === MAX_EVENT_COUNTER) { + val next = value + 1 + if (value == MAX_EVENT_COUNTER || next > limitForParent) { parentEventCounter.putIfAbsent(event.id.eventId.id, MAX_EVENT_COUNTER) MAX_EVENT_COUNTER } else { - if (value.incrementAndGet() > limitForParent) { - parentEventCounter.putIfAbsent(event.id.eventId.id, MAX_EVENT_COUNTER) - MAX_EVENT_COUNTER - } else { - value - } + next } } + } != MAX_EVENT_COUNTER + } + } + + private class HashParentEventCounter( + private val limitForParent: Long + ) : IParentEventCounter { + private val parentEventCounter = ConcurrentHashMap() + + override fun checkCountAndGet(event: BaseEventEntity): Boolean { + if (event.parentEventId == null) { + return true } + return parentEventCounter.compute(event.parentEventId.eventId.id.toLongHash()) { _, value -> + if (value == null) { + 1L + } else { + val next = value + 1 + if (value == MAX_EVENT_COUNTER || next > limitForParent) { + parentEventCounter.putIfAbsent(event.id.eventId.id.toLongHash(), MAX_EVENT_COUNTER) + MAX_EVENT_COUNTER + } else { + next + } + } + } != MAX_EVENT_COUNTER + } - return value !== MAX_EVENT_COUNTER + companion object { + private val messageDigest = ThreadLocal.withInitial { MessageDigest.getInstance("MD5") } + + fun String.toLongHash(): Long { + val hashBytes = messageDigest.get().digest(this.toByteArray()) + + var longHash: Long = 0 + for (i in 0 until 8) { + longHash = (longHash shl 8) or (hashBytes[i].toLong() and 0xFF) + } + return longHash + } } } companion object { - private val MAX_EVENT_COUNTER = AtomicLong(Long.MAX_VALUE) + private const val MAX_EVENT_COUNTER = Long.MAX_VALUE - fun create(limitForParent: Long? = null, ignoreLimitForParent: Boolean = false): IParentEventCounter = if (ignoreLimitForParent || limitForParent == null) { + fun create(limitForParent: Long? = null, mode: String = "limit"): IParentEventCounter = if ((mode != "limit" && mode != "hash") || limitForParent == null) { NoLimitedParentEventCounter } else { - LimitedParentEventCounter(limitForParent) + when(mode) { + "hash" -> HashParentEventCounter(limitForParent) + "limit" -> LimitedParentEventCounter(limitForParent) + else -> LimitedParentEventCounter(limitForParent) + } } } } \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/SearchEventsHandler.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/SearchEventsHandler.kt index a952be29..89a237e2 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/SearchEventsHandler.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/SearchEventsHandler.kt @@ -83,7 +83,7 @@ class SearchEventsHandler(context: Context<*, *, *, *>) { private val sseEventSearchStep: Long = context.configuration.sseEventSearchStep.value.toLong() private val eventSearchChunkSize: Int = context.configuration.eventSearchChunkSize.value.toInt() private val keepAliveTimeout: Long = context.configuration.keepAliveTimeout.value.toLong() - private val ignoreLimitForParent: Boolean = context.configuration.ignoreLimitForParent.value.toBoolean() + private val ignoreLimitForParent: String = context.configuration.ignoreLimitForParent.value private suspend fun keepAlive( writer: StreamWriter<*, *>, diff --git a/src/test/kotlin/handlers/IParentEventCounterTest.kt b/src/test/kotlin/handlers/IParentEventCounterTest.kt index 91b0c911..613e4ca1 100644 --- a/src/test/kotlin/handlers/IParentEventCounterTest.kt +++ b/src/test/kotlin/handlers/IParentEventCounterTest.kt @@ -24,6 +24,8 @@ import org.junit.jupiter.api.Assertions.assertFalse import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource import java.time.Instant import java.util.UUID @@ -87,10 +89,11 @@ class IParentEventCounterTest { ) } - @Test - fun `limit root event test`() { + @ParameterizedTest + @ValueSource(strings = ["limit", "hash"]) + fun `limit root event test`(mode: String) { val limitForParent = 50 - val eventCounter = IParentEventCounter.create(limitForParent.toLong()) + val eventCounter = IParentEventCounter.create(limitForParent.toLong(), mode) val rootEventId = NEXT_UUID @@ -118,10 +121,11 @@ class IParentEventCounterTest { } } - @Test - fun `singe event test`() { + @ParameterizedTest + @ValueSource(strings = ["limit", "hash"]) + fun `singe event test`(mode: String) { val limitForParent = 50 - val eventCounter = IParentEventCounter.create(limitForParent.toLong()) + val eventCounter = IParentEventCounter.create(limitForParent.toLong(), mode) val parentEventId = ProviderEventId( batchId = null, @@ -179,10 +183,11 @@ class IParentEventCounterTest { ) } - @Test - fun `batched event test`() { + @ParameterizedTest + @ValueSource(strings = ["limit", "hash"]) + fun `batched event test`(mode: String) { val limitForParent = 50 - val eventCounter = IParentEventCounter.create(limitForParent.toLong()) + val eventCounter = IParentEventCounter.create(limitForParent.toLong(), mode) val parentEventId = ProviderEventId( batchId = null, From 2600c281c3286f8eb4cf172c9b3569ebc2faaacd Mon Sep 17 00:00:00 2001 From: "nikita.smirnov" Date: Mon, 30 Jun 2025 12:01:05 +0400 Subject: [PATCH 3/7] Corrected HashParentEventCounter --- .../th2/rptdataprovider/handlers/IParentEventCounter.kt | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/IParentEventCounter.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/IParentEventCounter.kt index 9e7bdbc8..2fb6f030 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/IParentEventCounter.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/IParentEventCounter.kt @@ -100,12 +100,11 @@ internal interface IParentEventCounter { companion object { private const val MAX_EVENT_COUNTER = Long.MAX_VALUE - fun create(limitForParent: Long? = null, mode: String = "limit"): IParentEventCounter = if ((mode != "limit" && mode != "hash") || limitForParent == null) { + fun create(limitForParent: Long? = null, mode: String = "limit"): IParentEventCounter = if (limitForParent == null) { NoLimitedParentEventCounter } else { when(mode) { "hash" -> HashParentEventCounter(limitForParent) - "limit" -> LimitedParentEventCounter(limitForParent) else -> LimitedParentEventCounter(limitForParent) } } From c0bb4f3ac621cee281df036fe881841fef676a84 Mon Sep 17 00:00:00 2001 From: "nikita.smirnov" Date: Mon, 30 Jun 2025 12:34:33 +0400 Subject: [PATCH 4/7] Doesn't store event if batched --- .../handlers/IParentEventCounter.kt | 8 +++++-- .../handlers/IParentEventCounterTest.kt | 24 +++++++++++++++++-- 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/IParentEventCounter.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/IParentEventCounter.kt index 2fb6f030..dc9dd40e 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/IParentEventCounter.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/IParentEventCounter.kt @@ -48,7 +48,9 @@ internal interface IParentEventCounter { } else { val next = value + 1 if (value == MAX_EVENT_COUNTER || next > limitForParent) { - parentEventCounter.putIfAbsent(event.id.eventId.id, MAX_EVENT_COUNTER) + if (event.batchId == null) { + parentEventCounter.putIfAbsent(event.id.eventId.id, MAX_EVENT_COUNTER) + } MAX_EVENT_COUNTER } else { next @@ -73,7 +75,9 @@ internal interface IParentEventCounter { } else { val next = value + 1 if (value == MAX_EVENT_COUNTER || next > limitForParent) { - parentEventCounter.putIfAbsent(event.id.eventId.id.toLongHash(), MAX_EVENT_COUNTER) + if (event.batchId == null) { + parentEventCounter.putIfAbsent(event.id.eventId.id.toLongHash(), MAX_EVENT_COUNTER) + } MAX_EVENT_COUNTER } else { next diff --git a/src/test/kotlin/handlers/IParentEventCounterTest.kt b/src/test/kotlin/handlers/IParentEventCounterTest.kt index 613e4ca1..eee6fb6a 100644 --- a/src/test/kotlin/handlers/IParentEventCounterTest.kt +++ b/src/test/kotlin/handlers/IParentEventCounterTest.kt @@ -193,7 +193,7 @@ class IParentEventCounterTest { batchId = null, eventId = StoredTestEventId(BOOK_ID, SCOPE, Instant.now(), NEXT_UUID), ) - val batchId = StoredTestEventId(BOOK_ID, SCOPE, Instant.now(), NEXT_UUID) + var batchId = StoredTestEventId(BOOK_ID, SCOPE, Instant.now(), NEXT_UUID) repeat(limitForParent) { assertTrue( @@ -211,6 +211,7 @@ class IParentEventCounterTest { } val nextEventId = StoredTestEventId(BOOK_ID, SCOPE, Instant.now(), NEXT_UUID) + batchId = StoredTestEventId(BOOK_ID, SCOPE, Instant.now(), NEXT_UUID) assertAll( { assertFalse( @@ -226,6 +227,25 @@ class IParentEventCounterTest { "single event id, attempt ${limitForParent + 1}", ) }, + { + repeat(limitForParent) { + assertTrue( + eventCounter.checkCountAndGet( + createEventEntity( + ProviderEventId( + batchId = batchId, + eventId = StoredTestEventId(BOOK_ID, SCOPE, Instant.now(), NEXT_UUID), + ), + ProviderEventId( + batchId = batchId, + eventId = nextEventId + ) + ), + ), + "child of single event id, attempt ${limitForParent + 1}", + ) + } + }, { assertFalse( eventCounter.checkCountAndGet( @@ -240,7 +260,7 @@ class IParentEventCounterTest { ) ), ), - "child of single event id, attempt ${limitForParent + 1}", + "child of single event id, attempt", ) }, ) From dad79bc975f577bdaf5505c33e0fe66a77e77b31 Mon Sep 17 00:00:00 2001 From: "nikita.smirnov" Date: Tue, 1 Jul 2025 10:29:10 +0400 Subject: [PATCH 5/7] Added th2_rpt_parent_event_count metric --- .../handlers/IParentEventCounter.kt | 30 +++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/IParentEventCounter.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/IParentEventCounter.kt index dc9dd40e..a33373f3 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/IParentEventCounter.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/IParentEventCounter.kt @@ -17,6 +17,8 @@ package com.exactpro.th2.rptdataprovider.handlers import com.exactpro.th2.rptdataprovider.entities.responses.BaseEventEntity +import io.prometheus.client.Gauge +import java.lang.ref.Cleaner import java.security.MessageDigest import java.util.concurrent.ConcurrentHashMap @@ -37,6 +39,12 @@ internal interface IParentEventCounter { ) : IParentEventCounter { private val parentEventCounter = ConcurrentHashMap() + init { + CLEANER.register(this) { + PARENT_EVENT_COUNTER.dec(parentEventCounter.size.toDouble()) + } + } + override fun checkCountAndGet(event: BaseEventEntity): Boolean { if (event.parentEventId == null) { return true @@ -44,12 +52,15 @@ internal interface IParentEventCounter { return parentEventCounter.compute(event.parentEventId.eventId.id) { _, value -> if (value == null) { + PARENT_EVENT_COUNTER.inc() 1L } else { val next = value + 1 if (value == MAX_EVENT_COUNTER || next > limitForParent) { if (event.batchId == null) { - parentEventCounter.putIfAbsent(event.id.eventId.id, MAX_EVENT_COUNTER) + if (parentEventCounter.putIfAbsent(event.id.eventId.id, MAX_EVENT_COUNTER) == null) { + PARENT_EVENT_COUNTER.inc() + } } MAX_EVENT_COUNTER } else { @@ -65,18 +76,27 @@ internal interface IParentEventCounter { ) : IParentEventCounter { private val parentEventCounter = ConcurrentHashMap() + init { + CLEANER.register(this) { + PARENT_EVENT_COUNTER.dec(parentEventCounter.size.toDouble()) + } + } + override fun checkCountAndGet(event: BaseEventEntity): Boolean { if (event.parentEventId == null) { return true } return parentEventCounter.compute(event.parentEventId.eventId.id.toLongHash()) { _, value -> if (value == null) { + PARENT_EVENT_COUNTER.inc() 1L } else { val next = value + 1 if (value == MAX_EVENT_COUNTER || next > limitForParent) { if (event.batchId == null) { - parentEventCounter.putIfAbsent(event.id.eventId.id.toLongHash(), MAX_EVENT_COUNTER) + if (parentEventCounter.putIfAbsent(event.id.eventId.id.toLongHash(), MAX_EVENT_COUNTER) == null) { + PARENT_EVENT_COUNTER.inc() + } } MAX_EVENT_COUNTER } else { @@ -102,6 +122,12 @@ internal interface IParentEventCounter { } companion object { + private val PARENT_EVENT_COUNTER = Gauge + .build("th2_rpt_parent_event_count", "Number of parent events are cached in memory") + .register() + + private val CLEANER = Cleaner.create() + private const val MAX_EVENT_COUNTER = Long.MAX_VALUE fun create(limitForParent: Long? = null, mode: String = "limit"): IParentEventCounter = if (limitForParent == null) { From 23a46ad8ca6cac5faba2b12fa7637a00956661d4 Mon Sep 17 00:00:00 2001 From: "nikita.smirnov" Date: Wed, 2 Jul 2025 10:34:38 +0400 Subject: [PATCH 6/7] count only potential parent events (single parent event) --- .../handlers/IParentEventCounter.kt | 15 +++++++-------- .../kotlin/handlers/IParentEventCounterTest.kt | 11 ++++------- 2 files changed, 11 insertions(+), 15 deletions(-) diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/IParentEventCounter.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/IParentEventCounter.kt index a33373f3..e21cb42b 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/IParentEventCounter.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/IParentEventCounter.kt @@ -46,9 +46,8 @@ internal interface IParentEventCounter { } override fun checkCountAndGet(event: BaseEventEntity): Boolean { - if (event.parentEventId == null) { - return true - } + if (event.parentEventId == null) return true // exclude root events + if (event.parentEventId.batchId != null) return true // exclude parents inside batch return parentEventCounter.compute(event.parentEventId.eventId.id) { _, value -> if (value == null) { @@ -57,7 +56,7 @@ internal interface IParentEventCounter { } else { val next = value + 1 if (value == MAX_EVENT_COUNTER || next > limitForParent) { - if (event.batchId == null) { + if (!event.isBatched) { // exclude batched events if (parentEventCounter.putIfAbsent(event.id.eventId.id, MAX_EVENT_COUNTER) == null) { PARENT_EVENT_COUNTER.inc() } @@ -83,9 +82,9 @@ internal interface IParentEventCounter { } override fun checkCountAndGet(event: BaseEventEntity): Boolean { - if (event.parentEventId == null) { - return true - } + if (event.parentEventId == null) return true // exclude root events + if (event.parentEventId.batchId != null) return true // exclude parents inside batch + return parentEventCounter.compute(event.parentEventId.eventId.id.toLongHash()) { _, value -> if (value == null) { PARENT_EVENT_COUNTER.inc() @@ -93,7 +92,7 @@ internal interface IParentEventCounter { } else { val next = value + 1 if (value == MAX_EVENT_COUNTER || next > limitForParent) { - if (event.batchId == null) { + if (!event.isBatched) { // exclude batched events if (parentEventCounter.putIfAbsent(event.id.eventId.id.toLongHash(), MAX_EVENT_COUNTER) == null) { PARENT_EVENT_COUNTER.inc() } diff --git a/src/test/kotlin/handlers/IParentEventCounterTest.kt b/src/test/kotlin/handlers/IParentEventCounterTest.kt index eee6fb6a..301f8edf 100644 --- a/src/test/kotlin/handlers/IParentEventCounterTest.kt +++ b/src/test/kotlin/handlers/IParentEventCounterTest.kt @@ -228,7 +228,7 @@ class IParentEventCounterTest { ) }, { - repeat(limitForParent) { + repeat(limitForParent + 1) { assertTrue( eventCounter.checkCountAndGet( createEventEntity( @@ -247,17 +247,14 @@ class IParentEventCounterTest { } }, { - assertFalse( + assertFalse ( eventCounter.checkCountAndGet( createEventEntity( ProviderEventId( - batchId = batchId, + batchId = null, eventId = StoredTestEventId(BOOK_ID, SCOPE, Instant.now(), NEXT_UUID), ), - ProviderEventId( - batchId = batchId, - eventId = nextEventId - ) + parentEventId ), ), "child of single event id, attempt", From ebb6960298e82d76cb3cea14f57932b1808e3f88 Mon Sep 17 00:00:00 2001 From: "nikita.smirnov" Date: Wed, 2 Jul 2025 12:21:01 +0400 Subject: [PATCH 7/7] Update batch parent event id counter --- .../entities/responses/BaseEventEntity.kt | 10 ++++++---- .../rptdataprovider/handlers/IParentEventCounter.kt | 8 ++++++-- .../th2/rptdataprovider/producers/EventProducer.kt | 5 +++-- src/test/kotlin/handlers/IParentEventCounterTest.kt | 6 +++++- 4 files changed, 20 insertions(+), 9 deletions(-) diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/responses/BaseEventEntity.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/responses/BaseEventEntity.kt index da7d4e3e..a8bd5ed8 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/responses/BaseEventEntity.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/responses/BaseEventEntity.kt @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2025 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,7 +16,6 @@ package com.exactpro.th2.rptdataprovider.entities.responses -import com.exactpro.cradle.testevents.StoredTestEvent import com.exactpro.cradle.testevents.StoredTestEventId import com.exactpro.cradle.testevents.TestEventSingle import com.exactpro.th2.rptdataprovider.entities.internal.ProviderEventId @@ -36,7 +35,8 @@ data class BaseEventEntity( val startTimestamp: Instant, val parentEventId: ProviderEventId?, - val successful: Boolean + val batchParentEventId: StoredTestEventId?, + val successful: Boolean, ) { var attachedMessageIds: Set = emptySet() @@ -52,7 +52,8 @@ data class BaseEventEntity( stored: TestEventSingle, eventId: ProviderEventId, batchId: StoredTestEventId?, - parentEventId: ProviderEventId? + parentEventId: ProviderEventId?, + batchParentEventId: StoredTestEventId?, ) : this( batchId = batchId, isBatched = batchId != null, @@ -62,6 +63,7 @@ data class BaseEventEntity( startTimestamp = stored.startTimestamp, endTimestamp = stored.endTimestamp, parentEventId = parentEventId, + batchParentEventId = batchParentEventId, successful = stored.isSuccess ) { this.rawValue = stored diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/IParentEventCounter.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/IParentEventCounter.kt index e21cb42b..4a4d5bfd 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/IParentEventCounter.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/IParentEventCounter.kt @@ -49,7 +49,9 @@ internal interface IParentEventCounter { if (event.parentEventId == null) return true // exclude root events if (event.parentEventId.batchId != null) return true // exclude parents inside batch - return parentEventCounter.compute(event.parentEventId.eventId.id) { _, value -> + val parentEventId = event.batchParentEventId?.id ?: event.parentEventId.eventId.id + + return parentEventCounter.compute(parentEventId) { _, value -> if (value == null) { PARENT_EVENT_COUNTER.inc() 1L @@ -85,7 +87,9 @@ internal interface IParentEventCounter { if (event.parentEventId == null) return true // exclude root events if (event.parentEventId.batchId != null) return true // exclude parents inside batch - return parentEventCounter.compute(event.parentEventId.eventId.id.toLongHash()) { _, value -> + val parentEventId = event.batchParentEventId?.id ?: event.parentEventId.eventId.id + + return parentEventCounter.compute(parentEventId.toLongHash()) { _, value -> if (value == null) { PARENT_EVENT_COUNTER.inc() 1L diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/producers/EventProducer.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/producers/EventProducer.kt index b01c6f71..9350a4d5 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/producers/EventProducer.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/producers/EventProducer.kt @@ -1,5 +1,5 @@ /* - * Copyright 2020-2024 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2025 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -167,7 +167,8 @@ class EventProducer(private val cradle: CradleService, private val mapper: Objec } else { ProviderEventId(null, parentId) } - } + }, + batch?.parentId ) } diff --git a/src/test/kotlin/handlers/IParentEventCounterTest.kt b/src/test/kotlin/handlers/IParentEventCounterTest.kt index 301f8edf..6ce1e815 100644 --- a/src/test/kotlin/handlers/IParentEventCounterTest.kt +++ b/src/test/kotlin/handlers/IParentEventCounterTest.kt @@ -222,6 +222,7 @@ class IParentEventCounterTest { eventId = nextEventId, ), parentEventId, + parentEventId.eventId, ), ), "single event id, attempt ${limitForParent + 1}", @@ -239,7 +240,8 @@ class IParentEventCounterTest { ProviderEventId( batchId = batchId, eventId = nextEventId - ) + ), + parentEventId.eventId ), ), "child of single event id, attempt ${limitForParent + 1}", @@ -273,6 +275,7 @@ class IParentEventCounterTest { private fun createEventEntity( id: ProviderEventId, parentEventId: ProviderEventId? = null, + batchParentEventId: StoredTestEventId? = null, ) = BaseEventEntity( type = "event", id = id, @@ -283,6 +286,7 @@ class IParentEventCounterTest { startTimestamp = id.eventId.startTimestamp, endTimestamp = null, parentEventId = parentEventId, + batchParentEventId = batchParentEventId, successful = true, ) }