Skip to content

[TH2-3249]#266

Draft
lumber1000 wants to merge 8 commits intomessage_pipeline_refactoring_cradle_2_20from
TH2-3249
Draft

[TH2-3249]#266
lumber1000 wants to merge 8 commits intomessage_pipeline_refactoring_cradle_2_20from
TH2-3249

Conversation

@lumber1000
Copy link
Copy Markdown
Contributor

codec gRPC interface

@lumber1000 lumber1000 changed the base branch from master to message_pipeline_refactoring_cradle_2_20 April 19, 2022 10:37

@Suppress("MemberVisibilityCanBePrivate")
class Context(
class Context constructor(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

Comment on lines +42 to +46
Executors.newFixedThreadPool(configuration.codecRequestThreadPool.value.toInt()).asCoroutineDispatcher()
)

private val callbackScope = CoroutineScope(
Executors.newFixedThreadPool(configuration.codecCallbackThreadPool.value.toInt()).asCoroutineDispatcher()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need separate thread pools for these scopes? Maybe they can share a single one?

Comment on lines +65 to +70
val firstSequence =
decodedBatch.groupsList.firstOrNull()?.messagesList?.firstOrNull()?.sequence
val lastSequence =
decodedBatch.groupsList?.lastOrNull()?.messagesList?.lastOrNull()?.sequence
val stream =
"${decodedBatch.groupsList.firstOrNull()?.messagesList?.firstOrNull()?.message?.sessionAlias}:${decodedBatch.groupsList.firstOrNull()?.messagesList?.firstOrNull()?.message?.direction.toString()}"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
val firstSequence =
decodedBatch.groupsList.firstOrNull()?.messagesList?.firstOrNull()?.sequence
val lastSequence =
decodedBatch.groupsList?.lastOrNull()?.messagesList?.lastOrNull()?.sequence
val stream =
"${decodedBatch.groupsList.firstOrNull()?.messagesList?.firstOrNull()?.message?.sessionAlias}:${decodedBatch.groupsList.firstOrNull()?.messagesList?.firstOrNull()?.message?.direction.toString()}"
val firstMessage = decodedBatch.groupsList.firstOrNull()?.messagesList?.firstOrNull()?.message
val lastMessage = decodedBatch.groupsList.lastOrNull()?.messagesList?.lastOrNull()?.message
val firstSequence = firstMessage?.sequence
val lastSequence = lastMessage?.sequence
val stream = "${firstMessage?.sessionAlias}:${firstMessage?.direction.toString()}"

Comment on lines +123 to +126
val sessionAlias =
request.protobufRawMessageBatch.groupsList
.first().messagesList
.first().rawMessage.metadata.id.connectionId.sessionAlias
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
val sessionAlias =
request.protobufRawMessageBatch.groupsList
.first().messagesList
.first().rawMessage.metadata.id.connectionId.sessionAlias
val sessionAlias = request.protobufRawMessageBatch.groupsList[0]
.messagesList[0]
.rawMessage
.sessionAlias

Comment on lines +108 to +113
val firstSequence =
request.protobufRawMessageBatch.groupsList.first()?.messagesList?.first()?.rawMessage?.sequence
val lastSequence =
request.protobufRawMessageBatch.groupsList.last()?.messagesList?.last()?.rawMessage?.sequence
val stream =
"${request.protobufRawMessageBatch.groupsList.first()?.messagesList?.first()?.rawMessage?.sessionAlias}:${request.protobufRawMessageBatch.groupsList.first()?.messagesList?.first()?.rawMessage?.direction.toString()}"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above. Also why do we use first/last here, but firstOrNull/lastOrNull above?

Comment on lines +135 to +140
val firstSequence =
request.protobufRawMessageBatch.groupsList.first()?.messagesList?.first()?.rawMessage?.sequence
val lastSequence =
request.protobufRawMessageBatch.groupsList.last()?.messagesList?.last()?.rawMessage?.sequence
val stream =
"${request.protobufRawMessageBatch.groupsList.first()?.messagesList?.first()?.rawMessage?.sessionAlias}:${request.protobufRawMessageBatch.groupsList.first()?.messagesList?.first()?.rawMessage?.direction.toString()}"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

@lumber1000 lumber1000 requested a review from cordwelt April 21, 2022 10:39
Copy link
Copy Markdown
Contributor

@cordwelt cordwelt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, but I think someone from maintainers should look at this too


suspend fun sendToCodec(request: CodecBatchRequest): CodecBatchResponse {
return withContext(requestSenderScope.coroutineContext) {
while (pendingRequests.keys.size > maximumPendingRequests) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
while (pendingRequests.keys.size > maximumPendingRequests) {
while (pendingRequests.size > maximumPendingRequests) {

@Paleontolog Paleontolog changed the title Th2 3249 [TH2-3249] Jun 14, 2022
@WbyNghbr WbyNghbr marked this pull request as draft September 7, 2022 10:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants