diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java index 2db01ccc94624..e123a53abaf3e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java @@ -41,6 +41,7 @@ import org.apache.bookkeeper.mledger.Position; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.service.AbstractReplicator; import org.apache.pulsar.broker.service.Producer; import org.apache.pulsar.broker.service.Topic.PublishContext; import org.apache.pulsar.common.api.proto.KeyValue; @@ -132,6 +133,8 @@ public MessageDupUnknownException(String topicName, String producerName) { private final String replicatorPrefix; + private static final String REPL_LEDGER_ID_SUFFIX = "_LID"; + private static final String REPL_ENTRY_ID_SUFFIX = "_EID"; private final AtomicBoolean snapshotTaking = new AtomicBoolean(false); @@ -246,7 +249,10 @@ private CompletableFuture replayCursor(ManagedCursor cursor) { managedCursor = cursor; // Load the sequence ids from the snapshot in the cursor properties managedCursor.getProperties().forEach((k, v) -> { - producerRemoved(k); + // Geo-replication V2 keys are source-position watermarks, not producer lifecycle state. + if (!isReplSequenceKey(k)) { + producerRemoved(k); + } highestSequencedPushed.put(k, v); highestSequencedPersisted.put(k, v); }); @@ -257,6 +263,8 @@ private CompletableFuture replayCursor(ManagedCursor cursor) { return replayTask.replay(cursor, (__, buffer) -> { final var metadata = Commands.parseMessageMetadata(buffer); final var producerName = metadata.getProducerName(); + // Rebuild replication watermarks from entries written after the last dedup snapshot. + recoverReplWatermarkFromMetadata(metadata); final var sequenceId = Math.max(metadata.getHighestSequenceId(), metadata.getSequenceId()); highestSequencedPushed.put(producerName, sequenceId); highestSequencedPersisted.put(producerName, sequenceId); @@ -292,7 +300,7 @@ public MessageDupStatus isDuplicate(PublishContext publishContext, ByteBuf heade return MessageDupStatus.NotDup; } if (Producer.isRemoteOrShadow(publishContext.getProducerName(), replicatorPrefix)) { - if (!publishContext.supportsReplDedupByLidAndEid()){ + if (!publishContext.supportsReplDedupByLidAndEid()) { return isDuplicateReplV1(publishContext, headersAndPayload); } else { return isDuplicateReplV2(publishContext, headersAndPayload); @@ -337,36 +345,74 @@ private void setContextPropsIfRepl(PublishContext publishContext, ByteBuf header MessageMetadata md = Commands.parseMessageMetadata(headersAndPayload); headersAndPayload.readerIndex(readerIndex); - List kvPairList = md.getPropertiesList(); - for (KeyValue kvPair : kvPairList) { - if (kvPair.getKey().equals(MSG_PROP_REPL_SOURCE_POSITION)) { - if (!kvPair.getValue().contains(":")) { - log.warn() - .attr("producerName", publishContext.getProducerName()) - .attr("MSG_PROP_REPL_SOURCE_POSITION", MSG_PROP_REPL_SOURCE_POSITION) - .attr("value", kvPair.getValue()) - .log("Unexpected"); - break; - } - String[] ledgerIdAndEntryId = kvPair.getValue().split(":"); - if (ledgerIdAndEntryId.length != 2 || !StringUtils.isNumeric(ledgerIdAndEntryId[0]) - || !StringUtils.isNumeric(ledgerIdAndEntryId[1])) { - log.warn() - .attr("producerName", publishContext.getProducerName()) - .attr("MSG_PROP_REPL_SOURCE_POSITION", MSG_PROP_REPL_SOURCE_POSITION) - .attr("value", kvPair.getValue()) - .log("Unexpected"); - break; - } - long[] positionPair = new long[]{Long.valueOf(ledgerIdAndEntryId[0]).longValue(), - Long.valueOf(ledgerIdAndEntryId[1]).longValue()}; - publishContext.setProperty(MSG_PROP_REPL_SOURCE_POSITION, positionPair); - break; + long[] positionPair = getReplSourcePosition(md, publishContext.getProducerName()); + if (positionPair != null) { + publishContext.setProperty(MSG_PROP_REPL_SOURCE_POSITION, positionPair); + } + } + } + + private long[] getReplSourcePosition(MessageMetadata md, String producerName) { + List kvPairList = md.getPropertiesList(); + for (KeyValue kvPair : kvPairList) { + if (kvPair.getKey().equals(MSG_PROP_REPL_SOURCE_POSITION)) { + if (!kvPair.getValue().contains(":")) { + log.warn() + .attr("producerName", producerName) + .attr("MSG_PROP_REPL_SOURCE_POSITION", MSG_PROP_REPL_SOURCE_POSITION) + .attr("value", kvPair.getValue()) + .log("Unexpected"); + return null; } + String[] ledgerIdAndEntryId = kvPair.getValue().split(":"); + if (ledgerIdAndEntryId.length != 2 || !StringUtils.isNumeric(ledgerIdAndEntryId[0]) + || !StringUtils.isNumeric(ledgerIdAndEntryId[1])) { + log.warn() + .attr("producerName", producerName) + .attr("MSG_PROP_REPL_SOURCE_POSITION", MSG_PROP_REPL_SOURCE_POSITION) + .attr("value", kvPair.getValue()) + .log("Unexpected"); + return null; + } + return new long[]{Long.valueOf(ledgerIdAndEntryId[0]).longValue(), + Long.valueOf(ledgerIdAndEntryId[1]).longValue()}; + } + } + return null; + } + + @VisibleForTesting + void recoverReplWatermarkFromMetadata(MessageMetadata md) { + if (md.hasMarkerType()) { + return; + } + String replProducerName = getReplProducerName(md); + if (replProducerName != null) { + long[] replSourcePosition = getReplSourcePosition(md, replProducerName); + if (replSourcePosition != null) { + recoverMessagePersistedRepl(replProducerName, replSourcePosition[0], replSourcePosition[1]); } } } + @VisibleForTesting + String getReplProducerName(MessageMetadata md) { + final var shadowSourceTopic = topic.getShadowSourceTopic(); + if (md.hasReplicatedFrom() && shadowSourceTopic.isPresent()) { + return ShadowReplicator.getShadowProducerName(replicatorPrefix, + shadowSourceTopic.get().toString(), topic.getName()); + } + if (md.hasReplicatedFrom()) { + return AbstractReplicator.getReplicatorName(replicatorPrefix, md.getReplicatedFrom()) + + AbstractReplicator.REPL_PRODUCER_NAME_DELIMITER + + pulsar.getConfiguration().getClusterName(); + } + if (Producer.isRemoteOrShadow(md.getProducerName(), replicatorPrefix)) { + return md.getProducerName(); + } + return null; + } + public MessageDupStatus isDuplicateReplV2(PublishContext publishContext, ByteBuf headersAndPayload) { Object positionPairObj = publishContext.getProperty(MSG_PROP_REPL_SOURCE_POSITION); if (positionPairObj == null || !(positionPairObj instanceof long[])) { @@ -384,8 +430,8 @@ public MessageDupStatus isDuplicateReplV2(PublishContext publishContext, ByteBuf long replSequenceLId = positionPair[0]; long replSequenceEId = positionPair[1]; - String lastSequenceLIdKey = publishContext.getProducerName() + "_LID"; - String lastSequenceEIdKey = publishContext.getProducerName() + "_EID"; + String lastSequenceLIdKey = publishContext.getProducerName() + REPL_LEDGER_ID_SUFFIX; + String lastSequenceEIdKey = publishContext.getProducerName() + REPL_ENTRY_ID_SUFFIX; synchronized (highestSequencedPushed) { Long lastSequenceLIdPushed = highestSequencedPushed.get(lastSequenceLIdKey); Long lastSequenceEIdPushed = highestSequencedPushed.get(lastSequenceEIdKey); @@ -526,11 +572,24 @@ public void recordMessagePersistedRepl(PublishContext publishContext, Position p long[] positionPair = (long[]) positionPairObj; long replSequenceLId = positionPair[0]; long replSequenceEId = positionPair[1]; - String lastSequenceLIdKey = publishContext.getProducerName() + "_LID"; - String lastSequenceEIdKey = publishContext.getProducerName() + "_EID"; + recordMessagePersistedRepl(publishContext.getProducerName(), replSequenceLId, replSequenceEId); + increaseSnapshotCounterAndTakeSnapshotIfNeeded(position); + } + + private void recordMessagePersistedRepl(String producerName, long replSequenceLId, long replSequenceEId) { + String lastSequenceLIdKey = producerName + REPL_LEDGER_ID_SUFFIX; + String lastSequenceEIdKey = producerName + REPL_ENTRY_ID_SUFFIX; + highestSequencedPersisted.put(lastSequenceLIdKey, replSequenceLId); + highestSequencedPersisted.put(lastSequenceEIdKey, replSequenceEId); + } + + private void recoverMessagePersistedRepl(String producerName, long replSequenceLId, long replSequenceEId) { + String lastSequenceLIdKey = producerName + REPL_LEDGER_ID_SUFFIX; + String lastSequenceEIdKey = producerName + REPL_ENTRY_ID_SUFFIX; + highestSequencedPushed.put(lastSequenceLIdKey, replSequenceLId); + highestSequencedPushed.put(lastSequenceEIdKey, replSequenceEId); highestSequencedPersisted.put(lastSequenceLIdKey, replSequenceLId); highestSequencedPersisted.put(lastSequenceEIdKey, replSequenceEId); - increaseSnapshotCounterAndTakeSnapshotIfNeeded(position); } public void recordMessagePersistedNormal(PublishContext publishContext, Position position) { @@ -585,7 +644,23 @@ private CompletableFuture takeSnapshot(Position position) { Map snapshot = new TreeMap<>(); highestSequencedPersisted.forEach((producerName, sequenceId) -> { - if (snapshot.size() < maxNumberOfProducers) { + // A replication watermark is valid only when both source ledger and entry ids are saved together. + if (isReplSequenceKey(producerName)) { + String baseProducerName = getBaseProducerName(producerName); + String ledgerIdKey = baseProducerName + REPL_LEDGER_ID_SUFFIX; + String entryIdKey = baseProducerName + REPL_ENTRY_ID_SUFFIX; + Long ledgerId = highestSequencedPersisted.get(ledgerIdKey); + Long entryId = highestSequencedPersisted.get(entryIdKey); + if (ledgerId != null && entryId != null) { + snapshot.put(ledgerIdKey, ledgerId); + snapshot.put(entryIdKey, entryId); + } + } + }); + highestSequencedPersisted.forEach((producerName, sequenceId) -> { + if (isReplSequenceKey(producerName)) { + return; + } else if (snapshot.size() < maxNumberOfProducers) { snapshot.put(producerName, sequenceId); } }); @@ -636,7 +711,7 @@ public void producerRemoved(String producerName) { } // Producer is no-longer active - inactiveProducers.put(producerName, System.currentTimeMillis()); + inactiveProducers.put(getBaseProducerName(producerName), System.currentTimeMillis()); } /** @@ -662,13 +737,20 @@ public synchronized void purgeInactiveProducers() { long lastActiveTimestamp = entry.getValue(); if (lastActiveTimestamp < minimumActiveTimestamp) { - log.info() - .attr("producerName", producerName) - .log("Purging dedup information for producer"); mapIterator.remove(); - highestSequencedPushed.remove(producerName); - highestSequencedPersisted.remove(producerName); - hasInactive = true; + if (Producer.isRemoteOrShadow(producerName, replicatorPrefix)) { + // Keep the geo-replication watermark; the source can replay this producer after failover. + log.info() + .attr("producerName", producerName) + .log("Clearing inactive geo-replication producer"); + } else { + log.info() + .attr("producerName", producerName) + .log("Purging dedup information for producer"); + highestSequencedPushed.remove(producerName); + highestSequencedPersisted.remove(producerName); + hasInactive = true; + } } } if (hasInactive && isEnabled()) { @@ -676,6 +758,22 @@ public synchronized void purgeInactiveProducers() { } } + private String getBaseProducerName(String producerName) { + if (Producer.isRemoteOrShadow(producerName, replicatorPrefix)) { + if (producerName.endsWith(REPL_LEDGER_ID_SUFFIX)) { + return producerName.substring(0, producerName.length() - REPL_LEDGER_ID_SUFFIX.length()); + } else if (producerName.endsWith(REPL_ENTRY_ID_SUFFIX)) { + return producerName.substring(0, producerName.length() - REPL_ENTRY_ID_SUFFIX.length()); + } + } + return producerName; + } + + private boolean isReplSequenceKey(String producerName) { + return Producer.isRemoteOrShadow(producerName, replicatorPrefix) + && (producerName.endsWith(REPL_LEDGER_ID_SUFFIX) || producerName.endsWith(REPL_ENTRY_ID_SUFFIX)); + } + public long getLastPublishedSequenceId(String producerName) { Long sequenceId = highestSequencedPushed.get(producerName); return sequenceId != null ? sequenceId : -1; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java index 5d6f6f688cb9f..eff6b3abc6ceb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java @@ -56,6 +56,10 @@ public ShadowReplicator(String shadowTopic, PersistentTopic sourceTopic, Managed */ @Override protected String getProducerName() { + return getShadowProducerName(replicatorPrefix, localTopicName, remoteTopicName); + } + + static String getShadowProducerName(String replicatorPrefix, String localTopicName, String remoteTopicName) { return replicatorPrefix + "-" + localTopicName + REPL_PRODUCER_NAME_DELIMITER + remoteTopicName; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorDeduplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorDeduplicationTest.java index 20b9f0e8ded5d..23cdcec634a08 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorDeduplicationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorDeduplicationTest.java @@ -272,6 +272,94 @@ protected void handleSendReceipt(CommandSendReceipt sendReceipt) { admin2.topics().unload(topicName); } + @Test + public void testGeoReplDedupAfterTargetUnloadBeforeSnapshot() throws Exception { + // Drop send receipts so the source replication cursor cannot advance while target messages are persisted. + AtomicBoolean stuckSendReceipt = new AtomicBoolean(true); + Runnable cleanInjection = injectReplicatorClientCnx((conf, eventLoopGroup) -> new ClientCnx( + InstrumentProvider.NOOP, conf, eventLoopGroup) { + @Override + protected void handleSendReceipt(CommandSendReceipt sendReceipt) { + if (!stuckSendReceipt.get()) { + super.handleSendReceipt(sendReceipt); + } + } + }); + + final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_"); + final String subscription = "s1"; + Producer producer = null; + Consumer consumer = null; + try { + admin1.topics().createNonPartitionedTopic(topicName); + admin2.topics().createNonPartitionedTopic(topicName); + admin2.topics().createSubscription(topicName, subscription, MessageId.earliest); + + PersistentTopic persistentTopic2 = + (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get(); + admin2.topicPolicies().setDeduplicationStatus(topicName, true); + Awaitility.await().untilAsserted(() -> { + MessageDeduplication messageDeduplication2 = persistentTopic2.getMessageDeduplication(); + assertEquals(String.valueOf(messageDeduplication2.getStatus()), "Enabled"); + }); + + admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2)); + waitReplicatorStarted(topicName); + PersistentTopic persistentTopic1 = + (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get(); + + producer = client1.newProducer(Schema.INT32).topic(topicName).create(); + int messageCount = 4; + for (int i = 0; i < messageCount; i++) { + producer.send(i); + } + + Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { + assertEquals(admin2.topics().getStats(topicName).getMsgInCounter(), messageCount); + }); + + GeoPersistentReplicator replicator = + (GeoPersistentReplicator) persistentTopic1.getReplicators().get(cluster2); + assertNotNull(replicator); + + // Reload target before a dedup snapshot has to cover the latest replicated source positions. + admin2.topics().unload(topicName); + pulsar2.getBrokerService().getTopic(topicName, false).join(); + + // Reconnect the source replicator. It replays from the old replication cursor and target must deduplicate. + stuckSendReceipt.set(false); + replicator.producer.getClientCnx().ctx().channel().close(); + + Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { + assertEquals(replicator.getCursor().getNumberOfEntriesInBacklog(true), 0); + }); + + consumer = client2.newConsumer(Schema.INT32).topic(topicName).subscriptionName(subscription).subscribe(); + List received = new ArrayList<>(); + while (true) { + Message msg = consumer.receive(2, TimeUnit.SECONDS); + if (msg == null) { + break; + } + received.add(msg.getValue()); + } + assertEquals(received, Arrays.asList(0, 1, 2, 3)); + } finally { + if (consumer != null) { + consumer.close(); + } + if (producer != null) { + producer.close(); + } + cleanInjection.run(); + cleanupTopics(nonReplicatedNamespace, () -> { + admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1)); + admin1.topics().delete(topicName, true); + admin2.topics().delete(topicName, true); + }); + } + } + @DataProvider(name = "deduplicationArgs") public Object[][] deduplicationArgs() { return new Object[][] { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java index 319ca0b0e68cb..1c20665d4f8d8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.service.persistent; import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; +import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SOURCE_POSITION; import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; @@ -38,14 +39,18 @@ import io.netty.buffer.Unpooled; import io.netty.channel.EventLoopGroup; import java.lang.reflect.Field; +import java.util.HashMap; import java.util.Map; import java.util.Optional; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; import lombok.CustomLog; +import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.PositionFactory; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; @@ -58,8 +63,10 @@ import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.api.proto.MarkerType; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.naming.SystemTopicNames; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.compaction.CompactionServiceFactory; import org.awaitility.Awaitility; @@ -247,6 +254,178 @@ public void testInactiveProducerRemove() throws Exception { assertFalse(highestSequencedPushed.containsKey(producerName3)); } + @Test + @SuppressWarnings("unchecked") + public void testInactiveGeoReplicationProducerKeepsDedupState() throws Exception { + PulsarService pulsarService = mock(PulsarService.class); + PersistentTopic topic = mock(PersistentTopic.class); + ManagedLedger managedLedger = mock(ManagedLedger.class); + + ServiceConfiguration serviceConfiguration = new ServiceConfiguration(); + serviceConfiguration.setBrokerDeduplicationEntriesInterval(BROKER_DEDUPLICATION_ENTRIES_INTERVAL); + serviceConfiguration.setBrokerDeduplicationMaxNumberOfProducers(BROKER_DEDUPLICATION_MAX_NUMBER_PRODUCERS); + serviceConfiguration.setReplicatorPrefix(REPLICATOR_PREFIX); + serviceConfiguration.setBrokerDeduplicationProducerInactivityTimeoutMinutes(1); + + doReturn(serviceConfiguration).when(pulsarService).getConfiguration(); + MessageDeduplication messageDeduplication = spyWithClassAndConstructorArgs(MessageDeduplication.class, + pulsarService, topic, managedLedger); + doReturn(true).when(messageDeduplication).isEnabled(); + + ManagedCursor managedCursor = mock(ManagedCursor.class); + doReturn(PositionFactory.create(0, 0)).when(managedCursor).getMarkDeletedPosition(); + doReturn(managedCursor).when(messageDeduplication).getManagedCursor(); + + Field field = MessageDeduplication.class.getDeclaredField("inactiveProducers"); + field.setAccessible(true); + Map inactiveProducers = (ConcurrentHashMap) field.get(messageDeduplication); + + String replicatorProducerName = REPLICATOR_PREFIX + ".c1-->c2"; + String lastSequenceLIdKey = replicatorProducerName + "_LID"; + String lastSequenceEIdKey = replicatorProducerName + "_EID"; + // Geo V2 dedup state tracks the last source position, so it must outlive producer inactivity cleanup. + messageDeduplication.highestSequencedPushed.put(lastSequenceLIdKey, 7L); + messageDeduplication.highestSequencedPushed.put(lastSequenceEIdKey, 9L); + messageDeduplication.highestSequencedPersisted.put(lastSequenceLIdKey, 7L); + messageDeduplication.highestSequencedPersisted.put(lastSequenceEIdKey, 9L); + + messageDeduplication.producerRemoved(replicatorProducerName); + inactiveProducers.put(replicatorProducerName, System.currentTimeMillis() - 70000); + + messageDeduplication.purgeInactiveProducers(); + + assertFalse(inactiveProducers.containsKey(replicatorProducerName)); + assertEquals(messageDeduplication.highestSequencedPushed.get(lastSequenceLIdKey).longValue(), 7L); + assertEquals(messageDeduplication.highestSequencedPushed.get(lastSequenceEIdKey).longValue(), 9L); + assertEquals(messageDeduplication.highestSequencedPersisted.get(lastSequenceLIdKey).longValue(), 7L); + assertEquals(messageDeduplication.highestSequencedPersisted.get(lastSequenceEIdKey).longValue(), 9L); + } + + @Test + public void testSnapshotStoresCompleteReplicationWatermarkPairs() throws Exception { + PulsarService pulsarService = mock(PulsarService.class); + ServiceConfiguration serviceConfiguration = new ServiceConfiguration(); + serviceConfiguration.setBrokerDeduplicationEntriesInterval(1); + serviceConfiguration.setBrokerDeduplicationMaxNumberOfProducers(BROKER_DEDUPLICATION_MAX_NUMBER_PRODUCERS); + serviceConfiguration.setReplicatorPrefix(REPLICATOR_PREFIX); + doReturn(serviceConfiguration).when(pulsarService).getConfiguration(); + + PersistentTopic topic = mock(PersistentTopic.class); + doReturn("persistent://prop/ns/tp").when(topic).getName(); + + MessageDeduplication messageDeduplication = new MessageDeduplication(pulsarService, topic, + mock(ManagedLedger.class)); + ManagedCursor managedCursor = mock(ManagedCursor.class); + Field managedCursorField = MessageDeduplication.class.getDeclaredField("managedCursor"); + managedCursorField.setAccessible(true); + managedCursorField.set(messageDeduplication, managedCursor); + + AtomicReference> snapshotRef = new AtomicReference<>(); + Position position = PositionFactory.create(1, 1); + doAnswer(invocation -> { + Map snapshot = invocation.getArgument(1); + snapshotRef.set(new HashMap<>(snapshot)); + MarkDeleteCallback callback = invocation.getArgument(2); + callback.markDeleteComplete(null); + return null; + }).when(managedCursor).asyncMarkDelete(eq(position), any(), any(), any()); + + String completeReplicatorProducerName = REPLICATOR_PREFIX + ".c1-->c2"; + String partialReplicatorProducerName = REPLICATOR_PREFIX + ".c3-->c2"; + messageDeduplication.highestSequencedPersisted.put("normal-producer", 5L); + messageDeduplication.highestSequencedPersisted.put(partialReplicatorProducerName + "_LID", 11L); + + Topic.PublishContext publishContext = mock(Topic.PublishContext.class); + doReturn(completeReplicatorProducerName).when(publishContext).getProducerName(); + doReturn(new long[]{7L, 9L}).when(publishContext).getProperty(MSG_PROP_REPL_SOURCE_POSITION); + + messageDeduplication.recordMessagePersistedRepl(publishContext, position); + + Map snapshot = snapshotRef.get(); + assertNotNull(snapshot); + assertEquals(snapshot.get(completeReplicatorProducerName + "_LID").longValue(), 7L); + assertEquals(snapshot.get(completeReplicatorProducerName + "_EID").longValue(), 9L); + assertEquals(snapshot.get("normal-producer").longValue(), 5L); + assertFalse(snapshot.containsKey(partialReplicatorProducerName + "_LID")); + assertFalse(snapshot.containsKey(partialReplicatorProducerName + "_EID")); + } + + @Test + public void testReplayRecoverShadowReplicationWatermarkUsesShadowProducerName() { + String clusterName = "c2"; + String sourceTopicName = "persistent://prop/ns/source"; + String shadowTopicName = "persistent://prop/ns/source-shadow"; + + PulsarService pulsarService = mock(PulsarService.class); + ServiceConfiguration serviceConfiguration = new ServiceConfiguration(); + serviceConfiguration.setBrokerDeduplicationEntriesInterval(BROKER_DEDUPLICATION_ENTRIES_INTERVAL); + serviceConfiguration.setBrokerDeduplicationMaxNumberOfProducers(BROKER_DEDUPLICATION_MAX_NUMBER_PRODUCERS); + serviceConfiguration.setReplicatorPrefix(REPLICATOR_PREFIX); + serviceConfiguration.setClusterName(clusterName); + doReturn(serviceConfiguration).when(pulsarService).getConfiguration(); + + PersistentTopic topic = mock(PersistentTopic.class); + doReturn(shadowTopicName).when(topic).getName(); + doReturn(Optional.of(TopicName.get(sourceTopicName))).when(topic).getShadowSourceTopic(); + + MessageDeduplication messageDeduplication = new MessageDeduplication(pulsarService, topic, + mock(ManagedLedger.class)); + MessageMetadata metadata = new MessageMetadata() + .setProducerName("app-producer") + .setReplicatedFrom("c1") + .setSequenceId(1L) + .setPublishTime(System.currentTimeMillis()); + metadata.addProperty().setKey(MSG_PROP_REPL_SOURCE_POSITION).setValue("7:9"); + + messageDeduplication.recoverReplWatermarkFromMetadata(metadata); + + String shadowProducerName = ShadowReplicator.getShadowProducerName(REPLICATOR_PREFIX, sourceTopicName, + shadowTopicName); + assertEquals(messageDeduplication.highestSequencedPushed.get(shadowProducerName + "_LID").longValue(), 7L); + assertEquals(messageDeduplication.highestSequencedPushed.get(shadowProducerName + "_EID").longValue(), 9L); + assertEquals(messageDeduplication.highestSequencedPersisted.get(shadowProducerName + "_LID").longValue(), 7L); + assertEquals(messageDeduplication.highestSequencedPersisted.get(shadowProducerName + "_EID").longValue(), 9L); + + String geoProducerName = REPLICATOR_PREFIX + ".c1-->c2"; + assertFalse(messageDeduplication.highestSequencedPushed.containsKey(geoProducerName + "_LID")); + assertFalse(messageDeduplication.highestSequencedPushed.containsKey(geoProducerName + "_EID")); + } + + @Test + public void testReplayRecoverGeoReplicationWatermarkSkipsMarkers() { + String topicName = "persistent://prop/ns/tp"; + + PulsarService pulsarService = mock(PulsarService.class); + ServiceConfiguration serviceConfiguration = new ServiceConfiguration(); + serviceConfiguration.setBrokerDeduplicationEntriesInterval(BROKER_DEDUPLICATION_ENTRIES_INTERVAL); + serviceConfiguration.setBrokerDeduplicationMaxNumberOfProducers(BROKER_DEDUPLICATION_MAX_NUMBER_PRODUCERS); + serviceConfiguration.setReplicatorPrefix(REPLICATOR_PREFIX); + serviceConfiguration.setClusterName("c2"); + doReturn(serviceConfiguration).when(pulsarService).getConfiguration(); + + PersistentTopic topic = mock(PersistentTopic.class); + doReturn(topicName).when(topic).getName(); + doReturn(Optional.empty()).when(topic).getShadowSourceTopic(); + + MessageDeduplication messageDeduplication = new MessageDeduplication(pulsarService, topic, + mock(ManagedLedger.class)); + MessageMetadata metadata = new MessageMetadata() + .setProducerName("app-producer") + .setReplicatedFrom("c1") + .setSequenceId(1L) + .setMarkerType(MarkerType.REPLICATED_SUBSCRIPTION_UPDATE_VALUE) + .setPublishTime(System.currentTimeMillis()); + metadata.addProperty().setKey(MSG_PROP_REPL_SOURCE_POSITION).setValue("7:9"); + + messageDeduplication.recoverReplWatermarkFromMetadata(metadata); + + String geoProducerName = REPLICATOR_PREFIX + ".c1-->c2"; + assertFalse(messageDeduplication.highestSequencedPushed.containsKey(geoProducerName + "_LID")); + assertFalse(messageDeduplication.highestSequencedPushed.containsKey(geoProducerName + "_EID")); + assertFalse(messageDeduplication.highestSequencedPersisted.containsKey(geoProducerName + "_LID")); + assertFalse(messageDeduplication.highestSequencedPersisted.containsKey(geoProducerName + "_EID")); + } + @Test public void testIsDuplicateWithFailure() {