Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.bookkeeper.mledger.Position;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.AbstractReplicator;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.Topic.PublishContext;
import org.apache.pulsar.common.api.proto.KeyValue;
Expand Down Expand Up @@ -132,6 +133,8 @@ public MessageDupUnknownException(String topicName, String producerName) {

private final String replicatorPrefix;

private static final String REPL_LEDGER_ID_SUFFIX = "_LID";
private static final String REPL_ENTRY_ID_SUFFIX = "_EID";

private final AtomicBoolean snapshotTaking = new AtomicBoolean(false);

Expand Down Expand Up @@ -246,7 +249,10 @@ private CompletableFuture<Void> replayCursor(ManagedCursor cursor) {
managedCursor = cursor;
// Load the sequence ids from the snapshot in the cursor properties
managedCursor.getProperties().forEach((k, v) -> {
producerRemoved(k);
// Geo-replication V2 keys are source-position watermarks, not producer lifecycle state.
if (!isReplSequenceKey(k)) {
producerRemoved(k);
}
highestSequencedPushed.put(k, v);
highestSequencedPersisted.put(k, v);
});
Expand All @@ -257,6 +263,14 @@ private CompletableFuture<Void> replayCursor(ManagedCursor cursor) {
return replayTask.replay(cursor, (__, buffer) -> {
final var metadata = Commands.parseMessageMetadata(buffer);
final var producerName = metadata.getProducerName();
// Rebuild geo-replication watermarks from entries written after the last dedup snapshot.
String replProducerName = getReplProducerName(metadata);
if (replProducerName != null) {
long[] replSourcePosition = getReplSourcePosition(metadata, replProducerName);
if (replSourcePosition != null) {
recoverMessagePersistedRepl(replProducerName, replSourcePosition[0], replSourcePosition[1]);
}
}
final var sequenceId = Math.max(metadata.getHighestSequenceId(), metadata.getSequenceId());
highestSequencedPushed.put(producerName, sequenceId);
highestSequencedPersisted.put(producerName, sequenceId);
Expand Down Expand Up @@ -292,7 +306,7 @@ public MessageDupStatus isDuplicate(PublishContext publishContext, ByteBuf heade
return MessageDupStatus.NotDup;
}
if (Producer.isRemoteOrShadow(publishContext.getProducerName(), replicatorPrefix)) {
if (!publishContext.supportsReplDedupByLidAndEid()){
if (!publishContext.supportsReplDedupByLidAndEid()) {
return isDuplicateReplV1(publishContext, headersAndPayload);
} else {
return isDuplicateReplV2(publishContext, headersAndPayload);
Expand Down Expand Up @@ -337,34 +351,52 @@ private void setContextPropsIfRepl(PublishContext publishContext, ByteBuf header
MessageMetadata md = Commands.parseMessageMetadata(headersAndPayload);
headersAndPayload.readerIndex(readerIndex);

List<KeyValue> kvPairList = md.getPropertiesList();
for (KeyValue kvPair : kvPairList) {
if (kvPair.getKey().equals(MSG_PROP_REPL_SOURCE_POSITION)) {
if (!kvPair.getValue().contains(":")) {
log.warn()
.attr("producerName", publishContext.getProducerName())
.attr("MSG_PROP_REPL_SOURCE_POSITION", MSG_PROP_REPL_SOURCE_POSITION)
.attr("value", kvPair.getValue())
.log("Unexpected");
break;
}
String[] ledgerIdAndEntryId = kvPair.getValue().split(":");
if (ledgerIdAndEntryId.length != 2 || !StringUtils.isNumeric(ledgerIdAndEntryId[0])
|| !StringUtils.isNumeric(ledgerIdAndEntryId[1])) {
log.warn()
.attr("producerName", publishContext.getProducerName())
.attr("MSG_PROP_REPL_SOURCE_POSITION", MSG_PROP_REPL_SOURCE_POSITION)
.attr("value", kvPair.getValue())
.log("Unexpected");
break;
}
long[] positionPair = new long[]{Long.valueOf(ledgerIdAndEntryId[0]).longValue(),
Long.valueOf(ledgerIdAndEntryId[1]).longValue()};
publishContext.setProperty(MSG_PROP_REPL_SOURCE_POSITION, positionPair);
break;
long[] positionPair = getReplSourcePosition(md, publishContext.getProducerName());
if (positionPair != null) {
publishContext.setProperty(MSG_PROP_REPL_SOURCE_POSITION, positionPair);
}
}
}

private long[] getReplSourcePosition(MessageMetadata md, String producerName) {
List<KeyValue> kvPairList = md.getPropertiesList();
for (KeyValue kvPair : kvPairList) {
if (kvPair.getKey().equals(MSG_PROP_REPL_SOURCE_POSITION)) {
if (!kvPair.getValue().contains(":")) {
log.warn()
.attr("producerName", producerName)
.attr("MSG_PROP_REPL_SOURCE_POSITION", MSG_PROP_REPL_SOURCE_POSITION)
.attr("value", kvPair.getValue())
.log("Unexpected");
return null;
}
String[] ledgerIdAndEntryId = kvPair.getValue().split(":");
if (ledgerIdAndEntryId.length != 2 || !StringUtils.isNumeric(ledgerIdAndEntryId[0])
|| !StringUtils.isNumeric(ledgerIdAndEntryId[1])) {
log.warn()
.attr("producerName", producerName)
.attr("MSG_PROP_REPL_SOURCE_POSITION", MSG_PROP_REPL_SOURCE_POSITION)
.attr("value", kvPair.getValue())
.log("Unexpected");
return null;
}
return new long[]{Long.valueOf(ledgerIdAndEntryId[0]).longValue(),
Long.valueOf(ledgerIdAndEntryId[1]).longValue()};
}
}
return null;
}

private String getReplProducerName(MessageMetadata md) {
if (md.hasReplicatedFrom()) {
return AbstractReplicator.getReplicatorName(replicatorPrefix, md.getReplicatedFrom())
+ AbstractReplicator.REPL_PRODUCER_NAME_DELIMITER
+ pulsar.getConfiguration().getClusterName();
}
if (Producer.isRemoteOrShadow(md.getProducerName(), replicatorPrefix)) {
return md.getProducerName();
}
return null;
}

public MessageDupStatus isDuplicateReplV2(PublishContext publishContext, ByteBuf headersAndPayload) {
Expand All @@ -384,8 +416,8 @@ public MessageDupStatus isDuplicateReplV2(PublishContext publishContext, ByteBuf
long replSequenceLId = positionPair[0];
long replSequenceEId = positionPair[1];

String lastSequenceLIdKey = publishContext.getProducerName() + "_LID";
String lastSequenceEIdKey = publishContext.getProducerName() + "_EID";
String lastSequenceLIdKey = publishContext.getProducerName() + REPL_LEDGER_ID_SUFFIX;
String lastSequenceEIdKey = publishContext.getProducerName() + REPL_ENTRY_ID_SUFFIX;
synchronized (highestSequencedPushed) {
Long lastSequenceLIdPushed = highestSequencedPushed.get(lastSequenceLIdKey);
Long lastSequenceEIdPushed = highestSequencedPushed.get(lastSequenceEIdKey);
Expand Down Expand Up @@ -526,11 +558,24 @@ public void recordMessagePersistedRepl(PublishContext publishContext, Position p
long[] positionPair = (long[]) positionPairObj;
long replSequenceLId = positionPair[0];
long replSequenceEId = positionPair[1];
String lastSequenceLIdKey = publishContext.getProducerName() + "_LID";
String lastSequenceEIdKey = publishContext.getProducerName() + "_EID";
recordMessagePersistedRepl(publishContext.getProducerName(), replSequenceLId, replSequenceEId);
increaseSnapshotCounterAndTakeSnapshotIfNeeded(position);
}

private void recordMessagePersistedRepl(String producerName, long replSequenceLId, long replSequenceEId) {
String lastSequenceLIdKey = producerName + REPL_LEDGER_ID_SUFFIX;
String lastSequenceEIdKey = producerName + REPL_ENTRY_ID_SUFFIX;
highestSequencedPersisted.put(lastSequenceLIdKey, replSequenceLId);
highestSequencedPersisted.put(lastSequenceEIdKey, replSequenceEId);
}

private void recoverMessagePersistedRepl(String producerName, long replSequenceLId, long replSequenceEId) {
String lastSequenceLIdKey = producerName + REPL_LEDGER_ID_SUFFIX;
String lastSequenceEIdKey = producerName + REPL_ENTRY_ID_SUFFIX;
highestSequencedPushed.put(lastSequenceLIdKey, replSequenceLId);
highestSequencedPushed.put(lastSequenceEIdKey, replSequenceEId);
highestSequencedPersisted.put(lastSequenceLIdKey, replSequenceLId);
highestSequencedPersisted.put(lastSequenceEIdKey, replSequenceEId);
increaseSnapshotCounterAndTakeSnapshotIfNeeded(position);
}

public void recordMessagePersistedNormal(PublishContext publishContext, Position position) {
Expand Down Expand Up @@ -585,7 +630,23 @@ private CompletableFuture<Void> takeSnapshot(Position position) {

Map<String, Long> snapshot = new TreeMap<>();
highestSequencedPersisted.forEach((producerName, sequenceId) -> {
if (snapshot.size() < maxNumberOfProducers) {
// A geo-replication watermark is valid only when both source ledger and entry ids are saved together.
if (isReplSequenceKey(producerName)) {
String baseProducerName = getBaseProducerName(producerName);
String ledgerIdKey = baseProducerName + REPL_LEDGER_ID_SUFFIX;
String entryIdKey = baseProducerName + REPL_ENTRY_ID_SUFFIX;
Long ledgerId = highestSequencedPersisted.get(ledgerIdKey);
Long entryId = highestSequencedPersisted.get(entryIdKey);
if (ledgerId != null && entryId != null) {
snapshot.put(ledgerIdKey, ledgerId);
snapshot.put(entryIdKey, entryId);
}
}
});
highestSequencedPersisted.forEach((producerName, sequenceId) -> {
if (isReplSequenceKey(producerName)) {
return;
} else if (snapshot.size() < maxNumberOfProducers) {
snapshot.put(producerName, sequenceId);
}
});
Expand Down Expand Up @@ -636,7 +697,7 @@ public void producerRemoved(String producerName) {
}

// Producer is no-longer active
inactiveProducers.put(producerName, System.currentTimeMillis());
inactiveProducers.put(getBaseProducerName(producerName), System.currentTimeMillis());
}

/**
Expand All @@ -662,20 +723,43 @@ public synchronized void purgeInactiveProducers() {
long lastActiveTimestamp = entry.getValue();

if (lastActiveTimestamp < minimumActiveTimestamp) {
log.info()
.attr("producerName", producerName)
.log("Purging dedup information for producer");
mapIterator.remove();
highestSequencedPushed.remove(producerName);
highestSequencedPersisted.remove(producerName);
hasInactive = true;
if (Producer.isRemoteOrShadow(producerName, replicatorPrefix)) {
// Keep the geo-replication watermark; the source can replay this producer after failover.
log.info()
.attr("producerName", producerName)
.log("Clearing inactive geo-replication producer");
} else {
log.info()
.attr("producerName", producerName)
.log("Purging dedup information for producer");
highestSequencedPushed.remove(producerName);
highestSequencedPersisted.remove(producerName);
hasInactive = true;
}
}
}
if (hasInactive && isEnabled()) {
takeSnapshot(getManagedCursor().getMarkDeletedPosition());
}
}

private String getBaseProducerName(String producerName) {
if (Producer.isRemoteOrShadow(producerName, replicatorPrefix)) {
if (producerName.endsWith(REPL_LEDGER_ID_SUFFIX)) {
return producerName.substring(0, producerName.length() - REPL_LEDGER_ID_SUFFIX.length());
} else if (producerName.endsWith(REPL_ENTRY_ID_SUFFIX)) {
return producerName.substring(0, producerName.length() - REPL_ENTRY_ID_SUFFIX.length());
}
}
return producerName;
}

private boolean isReplSequenceKey(String producerName) {
return Producer.isRemoteOrShadow(producerName, replicatorPrefix)
&& (producerName.endsWith(REPL_LEDGER_ID_SUFFIX) || producerName.endsWith(REPL_ENTRY_ID_SUFFIX));
}

public long getLastPublishedSequenceId(String producerName) {
Long sequenceId = highestSequencedPushed.get(producerName);
return sequenceId != null ? sequenceId : -1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,94 @@ protected void handleSendReceipt(CommandSendReceipt sendReceipt) {
admin2.topics().unload(topicName);
}

@Test
public void testGeoReplDedupAfterTargetUnloadBeforeSnapshot() throws Exception {
// Drop send receipts so the source replication cursor cannot advance while target messages are persisted.
AtomicBoolean stuckSendReceipt = new AtomicBoolean(true);
Runnable cleanInjection = injectReplicatorClientCnx((conf, eventLoopGroup) -> new ClientCnx(
InstrumentProvider.NOOP, conf, eventLoopGroup) {
@Override
protected void handleSendReceipt(CommandSendReceipt sendReceipt) {
if (!stuckSendReceipt.get()) {
super.handleSendReceipt(sendReceipt);
}
}
});

final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_");
final String subscription = "s1";
Producer<Integer> producer = null;
Consumer<Integer> consumer = null;
try {
admin1.topics().createNonPartitionedTopic(topicName);
admin2.topics().createNonPartitionedTopic(topicName);
admin2.topics().createSubscription(topicName, subscription, MessageId.earliest);

PersistentTopic persistentTopic2 =
(PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get();
admin2.topicPolicies().setDeduplicationStatus(topicName, true);
Awaitility.await().untilAsserted(() -> {
MessageDeduplication messageDeduplication2 = persistentTopic2.getMessageDeduplication();
assertEquals(String.valueOf(messageDeduplication2.getStatus()), "Enabled");
});

admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2));
waitReplicatorStarted(topicName);
PersistentTopic persistentTopic1 =
(PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get();

producer = client1.newProducer(Schema.INT32).topic(topicName).create();
int messageCount = 4;
for (int i = 0; i < messageCount; i++) {
producer.send(i);
}

Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> {
assertEquals(admin2.topics().getStats(topicName).getMsgInCounter(), messageCount);
});

GeoPersistentReplicator replicator =
(GeoPersistentReplicator) persistentTopic1.getReplicators().get(cluster2);
assertNotNull(replicator);

// Reload target before a dedup snapshot has to cover the latest replicated source positions.
admin2.topics().unload(topicName);
pulsar2.getBrokerService().getTopic(topicName, false).join();

// Reconnect the source replicator. It replays from the old replication cursor and target must deduplicate.
stuckSendReceipt.set(false);
replicator.producer.getClientCnx().ctx().channel().close();

Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> {
assertEquals(replicator.getCursor().getNumberOfEntriesInBacklog(true), 0);
});

consumer = client2.newConsumer(Schema.INT32).topic(topicName).subscriptionName(subscription).subscribe();
List<Integer> received = new ArrayList<>();
while (true) {
Message<Integer> msg = consumer.receive(2, TimeUnit.SECONDS);
if (msg == null) {
break;
}
received.add(msg.getValue());
}
assertEquals(received, Arrays.asList(0, 1, 2, 3));
} finally {
if (consumer != null) {
consumer.close();
}
if (producer != null) {
producer.close();
}
cleanInjection.run();
cleanupTopics(nonReplicatedNamespace, () -> {
admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1));
admin1.topics().delete(topicName, true);
admin2.topics().delete(topicName, true);
});
}
}

@DataProvider(name = "deduplicationArgs")
public Object[][] deduplicationArgs() {
return new Object[][] {
Expand Down
Loading