Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3737,6 +3737,17 @@ public double getLoadBalancerBandwidthOutResourceWeight() {
)
private boolean transactionCoordinatorEnabled = false;

@FieldContext(
category = CATEGORY_TRANSACTION,
doc = "Enable the metadata-driven transaction coordinator used by scalable topics."
+ " When true, wire commands (NEW_TXN / END_TXN / etc.) are served by the"
+ " metadata-store-backed coordinator instead of the legacy"
+ " TransactionMetadataStoreService. Requires transactionCoordinatorEnabled"
+ " = true, and must be enabled together with the scalable-topic transaction"
+ " buffer and pending-ack store providers."
)
private boolean transactionCoordinatorScalableTopicsEnabled = false;

@FieldContext(
category = CATEGORY_TRANSACTION,
doc = "Class name for transaction metadata store provider"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@
import org.apache.pulsar.broker.storage.ManagedLedgerStorageClass;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
import org.apache.pulsar.broker.transaction.coordinator.v5.TransactionCoordinatorV5;
import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStoreProvider;
import org.apache.pulsar.broker.validator.MultipleListenerValidator;
Expand Down Expand Up @@ -283,6 +284,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private OpenTelemetryTransactionPendingAckStoreStats openTelemetryTransactionPendingAckStoreStats;

private TransactionMetadataStoreService transactionMetadataStoreService;
private TransactionCoordinatorV5 transactionCoordinatorV5;
private TransactionBufferProvider transactionBufferProvider;
private TransactionBufferClient transactionBufferClient;
private HashedWheelTimer transactionTimer;
Expand Down Expand Up @@ -1030,6 +1032,10 @@ public void start() throws PulsarServerException {
.newProvider(config.getTransactionMetadataStoreProviderClassName()), this,
transactionBufferClient, transactionTimer);

if (config.isTransactionCoordinatorScalableTopicsEnabled()) {
transactionCoordinatorV5 = new TransactionCoordinatorV5(this);
}

transactionBufferProvider = TransactionBufferProvider
.newProvider(config.getTransactionBufferProviderClassName());
transactionPendingAckStoreProvider = TransactionPendingAckStoreProvider
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3348,6 +3348,21 @@ protected void handleTcClientConnectRequest(CommandTcClientConnectRequest comman
return;
}

if (service.getPulsar().getConfig().isTransactionCoordinatorScalableTopicsEnabled()) {
service.pulsar().getTransactionCoordinatorV5().handleClientConnect(tcId)
.whenComplete((__, e) -> {
if (e == null) {
commandSender.sendTcClientConnectResponse(requestId);
} else {
log.error().attr("requestId", requestId).attr("tcId", tcId).exception(e)
.log("v5 TC client connect failed");
commandSender.sendTcClientConnectResponse(requestId,
BrokerServiceException.getClientErrorCode(e), e.getMessage());
}
});
return;
}

TransactionMetadataStoreService transactionMetadataStoreService =
service.pulsar().getTransactionMetadataStoreService();

Expand Down Expand Up @@ -3414,6 +3429,22 @@ protected void handleNewTxn(CommandNewTxn command) {
return;
}

if (service.getPulsar().getConfig().isTransactionCoordinatorScalableTopicsEnabled()) {
final String v5Owner = getPrincipal();
service.pulsar().getTransactionCoordinatorV5()
.newTransaction(tcId, command.getTxnTtlSeconds() * 1000L, v5Owner)
.whenComplete((txnId, e) -> {
if (e == null) {
commandSender.sendNewTxnResponse(requestId, txnId, tcId.getId());
} else {
Throwable cause = handleTxnException(e, BaseCommand.Type.NEW_TXN.name(), requestId);
commandSender.sendNewTxnErrorResponse(requestId, tcId.getId(),
BrokerServiceException.getClientErrorCode(cause), cause.getMessage());
}
});
return;
}

TransactionMetadataStoreService transactionMetadataStoreService =
service.pulsar().getTransactionMetadataStoreService();
final String owner = getPrincipal();
Expand Down Expand Up @@ -3465,6 +3496,14 @@ protected void handleAddPartitionToTxn(CommandAddPartitionToTxn command) {
return;
}

if (service.getPulsar().getConfig().isTransactionCoordinatorScalableTopicsEnabled()) {
// v5: TC doesn't need pre-registration — participants advertise themselves by writing
// /txn/op records when they actually apply ops.
writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId,
txnID.getLeastSigBits(), txnID.getMostSigBits()));
return;
}

TransactionMetadataStoreService transactionMetadataStoreService =
service.pulsar().getTransactionMetadataStoreService();
verifyTxnOwnership(txnID)
Expand Down Expand Up @@ -3525,6 +3564,20 @@ protected void handleEndTxn(CommandEndTxn command) {
return;
}

if (service.getPulsar().getConfig().isTransactionCoordinatorScalableTopicsEnabled()) {
service.pulsar().getTransactionCoordinatorV5().endTransaction(txnID, txnAction)
.whenComplete((__, e) -> {
if (e == null) {
commandSender.sendEndTxnResponse(requestId, txnID, txnAction);
} else {
Throwable cause = handleTxnException(e, BaseCommand.Type.END_TXN.name(), requestId);
commandSender.sendEndTxnErrorResponse(requestId, txnID,
BrokerServiceException.getClientErrorCode(cause), cause.getMessage());
}
});
return;
}

TransactionMetadataStoreService transactionMetadataStoreService =
service.pulsar().getTransactionMetadataStoreService();

Expand Down Expand Up @@ -3839,6 +3892,14 @@ protected void handleAddSubscriptionToTxn(CommandAddSubscriptionToTxn command) {
return;
}

if (service.getPulsar().getConfig().isTransactionCoordinatorScalableTopicsEnabled()) {
// v5: TC doesn't need pre-registration — participants advertise themselves by writing
// /txn/op records when they actually apply ops.
writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId,
txnID.getLeastSigBits(), txnID.getMostSigBits()));
return;
}

TransactionMetadataStoreService transactionMetadataStoreService =
service.pulsar().getTransactionMetadataStoreService();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.transaction.coordinator.v5;

import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import lombok.CustomLog;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.transaction.exception.coordinator.TransactionCoordinatorException;
import org.apache.pulsar.broker.transaction.metadata.TxnEvent;
import org.apache.pulsar.broker.transaction.metadata.TxnHeader;
import org.apache.pulsar.broker.transaction.metadata.TxnIds;
import org.apache.pulsar.broker.transaction.metadata.TxnMetadataStore;
import org.apache.pulsar.broker.transaction.metadata.TxnOp;
import org.apache.pulsar.broker.transaction.metadata.TxnOpKind;
import org.apache.pulsar.broker.transaction.metadata.TxnState;
import org.apache.pulsar.broker.transaction.metadata.Versioned;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.ScanConsumer;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;

/**
* PIP-473 v5 transaction coordinator — broker-side service.
*
* <p>Per-partition coordinator. A broker runs the v5 TC for partition {@code N} iff it owns
* partition {@code N} of {@code SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN} — same
* leader-election mechanism the legacy {@code TransactionMetadataStoreService} uses; reusing
* it keeps the client-side discovery surface unchanged.
*
* <p>Wire commands handled (routed by {@code ServerCnx} when
* {@code transactionCoordinatorScalableTopicsEnabled} is on):
* <ul>
* <li>{@code TC_CLIENT_CONNECT} → {@link #handleClientConnect}</li>
* <li>{@code NEW_TXN} → {@link #newTransaction}</li>
* <li>{@code ADD_PARTITION_TO_TXN}, {@code ADD_SUBSCRIPTION_TO_TXN} — no-ops per PIP; the v5
* participants advertise themselves by writing {@code /txn/op} records, so the TC doesn't
* need a pre-registration step.</li>
* <li>{@code END_TXN} → {@link #endTransaction}</li>
* </ul>
*
* <p>{@code endTransaction} CAS-updates the header to the terminal state, enumerates
* {@code /txn/op/<txnId>-*} via {@link TxnPaths#IDX_OPS_BY_TXN}, and publishes one
* segment-event per affected segment + one subscription-event per affected
* {@code (segment, subscription)} pair. The fan-out is metadata-store writes (not RPCs) and
* is bounded by the txn's participant count.
*
* <p>P5.1 scope: happy-path newTxn / endTxn. No timeout sweep, no GC sweep — those land in
* P5.2.
*/
@CustomLog
public class TransactionCoordinatorV5 {

private final PulsarService pulsar;
private final TxnMetadataStore txnStore;

public TransactionCoordinatorV5(PulsarService pulsar) {
this.pulsar = pulsar;
this.txnStore = new TxnMetadataStore(pulsar.getLocalMetadataStore());
}

// ---- TC client connect ------------------------------------------------

/**
* Verify this broker is the leader for {@code tcId} (owns the corresponding partition of
* {@code transaction_coordinator_assign}). Mirrors the ownership check the legacy
* {@code TransactionMetadataStoreService.handleTcClientConnect} performs — the same
* topic-ownership mechanism serves as our leader-election surface.
*/
public CompletableFuture<Void> handleClientConnect(TransactionCoordinatorID tcId) {
String assignPartition = SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN
.getPartition((int) tcId.getId()).toString();
return pulsar.getBrokerService().checkTopicNsOwnership(assignPartition);
}

// ---- newTransaction ---------------------------------------------------

/**
* Create a new transaction header at {@code /txn/id/<tcId>_<seq>}. The {@code leastSigBits}
* is drawn from the per-tcId monotonic sequence counter ({@link TxnMetadataStore#nextTxnSequence})
* so txnIds are never reused — the participant-side aborted-set is keyed by txnId, and reuse
* would break that.
*/
public CompletableFuture<TxnID> newTransaction(TransactionCoordinatorID tcId, long timeoutInMillis,
String owner) {
return txnStore.nextTxnSequence(tcId.getId()).thenCompose(seq -> {
TxnID txnId = new TxnID(tcId.getId(), seq);
TxnHeader header = new TxnHeader(TxnState.OPEN,
Duration.ofMillis(timeoutInMillis), Instant.now(), null);
return txnStore.createHeader(TxnIds.toKey(txnId), header).thenApply(stat -> txnId);
});
}

// ---- addPartition / addSubscription (no-op in v5) ---------------------

/**
* No-op per PIP-473 — in v5, participants advertise themselves by writing {@code /txn/op}
* records when they actually apply ops. The pre-registration step is unnecessary.
*/
public CompletableFuture<Void> addProducedPartitionToTxn(TxnID txnId, List<String> partitions) {
return CompletableFuture.completedFuture(null);
}

/** No-op (see {@link #addProducedPartitionToTxn}). */
public CompletableFuture<Void> addAckedSubscriptionToTxn(TxnID txnId,
List<TransactionSubscription> subscriptions) {
return CompletableFuture.completedFuture(null);
}

// ---- endTransaction ---------------------------------------------------

/**
* Finalise a transaction: CAS the header to {@code COMMITTED}/{@code ABORTED}, enumerate
* the txn's participants via {@link TxnMetadataStore#listOpsByTxn}, and publish one
* segment-event per affected segment and one subscription-event per affected
* {@code (segment, subscription)} pair. Idempotent against retries — a header already in
* the requested terminal state short-circuits without republishing.
*/
public CompletableFuture<Void> endTransaction(TxnID txnId, int txnAction) {
TxnState newState = newStateFor(txnAction);
if (newState == null) {
return FutureUtil.failedFuture(
new TransactionCoordinatorException.UnsupportedTxnActionException(txnId, txnAction));
}
String txnIdKey = TxnIds.toKey(txnId);
return txnStore.getHeader(txnIdKey).thenCompose(opt -> {
if (opt.isEmpty()) {
return FutureUtil.failedFuture(
new CoordinatorException.TransactionNotFoundException(
"Transaction not found: " + txnId));
}
Versioned<TxnHeader> v = opt.get();
TxnHeader current = v.value();
if (current.getState() == newState) {
// Idempotent retry — already in the requested terminal state. Re-publish events
// is safe but skip for simplicity; participants tolerate missing events via the
// header re-read in their reconcile path.
return CompletableFuture.completedFuture(null);
}
if (current.getState() != TxnState.OPEN) {
return FutureUtil.failedFuture(
new CoordinatorException.InvalidTxnStatusException(
"Transaction " + txnId + " is " + current.getState()
+ ", cannot transition to " + newState));
}
TxnHeader updated = new TxnHeader(newState, current.getTimeout(),
current.getCreatedAt(), Instant.now());
return txnStore.updateHeader(txnIdKey, updated, v.version())
.thenCompose(stat -> fanOutEvents(txnId, txnIdKey, newState));
});
}

private static TxnState newStateFor(int txnAction) {
if (txnAction == TxnAction.COMMIT_VALUE) {
return TxnState.COMMITTED;
} else if (txnAction == TxnAction.ABORT_VALUE) {
return TxnState.ABORTED;
}
return null;
}

/**
* Enumerate {@code /txn/op} via {@link TxnMetadataStore#listOpsByTxn}, group by participant,
* and publish one event per participant. Writes are independent so we fire them in parallel.
*/
private CompletableFuture<Void> fanOutEvents(TxnID txnId, String txnIdKey, TxnState decision) {
Set<String> writeSegments = ConcurrentHashSet.create();
Set<String> ackParticipants = ConcurrentHashSet.create();
return txnStore.listOpsByTxn(txnIdKey, new ScanConsumer() {
@Override
public void onNext(GetResult r) {
TxnOp op = TxnMetadataStore.fromJson(r.getValue(), TxnOp.class);
if (op.getKind() == TxnOpKind.WRITE) {
writeSegments.add(op.getSegment());
} else if (op.getKind() == TxnOpKind.ACK && op.getSubscription() != null) {
ackParticipants.add(op.getSegment() + "\0" + op.getSubscription());
}
}

@Override
public void onError(Throwable throwable) {
log.warn().attr("txnId", txnId).exception(throwable)
.log("endTxn participant enumeration encountered an error");
}

@Override
public void onCompleted() {
}
}).thenCompose(__ -> {
TxnEvent event = new TxnEvent(txnIdKey, decision);
CompletableFuture<?>[] publishes = new CompletableFuture<?>[
writeSegments.size() + ackParticipants.size()];
int i = 0;
for (String segment : writeSegments) {
publishes[i++] = txnStore.publishSegmentEvent(segment, event);
}
for (String packed : ackParticipants) {
int nul = packed.indexOf('\0');
String segment = packed.substring(0, nul);
String subscription = packed.substring(nul + 1);
publishes[i++] = txnStore.publishSubscriptionEvent(segment, subscription, event);
}
return CompletableFuture.allOf(publishes);
});
}

// ---- Small helper instead of pulling in a third-party concurrent set --------------------

private static final class ConcurrentHashSet {
private static <T> Set<T> create() {
return java.util.Collections.newSetFromMap(new java.util.concurrent.ConcurrentHashMap<>());
}
}
}
Loading
Loading