{@code ADD_PARTITION_TO_TXN}, {@code ADD_SUBSCRIPTION_TO_TXN} — no-ops per PIP; the v5
+ * participants advertise themselves by writing {@code /txn/op} records, so the TC doesn't
+ * need a pre-registration step.
+ *
{@code END_TXN} → {@link #endTransaction}
+ *
+ *
+ *
{@code endTransaction} CAS-updates the header to the terminal state, enumerates
+ * {@code /txn/op/-*} via {@link TxnPaths#IDX_OPS_BY_TXN}, and publishes one
+ * segment-event per affected segment + one subscription-event per affected
+ * {@code (segment, subscription)} pair. The fan-out is metadata-store writes (not RPCs) and
+ * is bounded by the txn's participant count.
+ *
+ *
P5.1 scope: happy-path newTxn / endTxn. No timeout sweep, no GC sweep — those land in
+ * P5.2.
+ */
+@CustomLog
+public class TransactionCoordinatorV5 {
+
+ private final PulsarService pulsar;
+ private final TxnMetadataStore txnStore;
+
+ public TransactionCoordinatorV5(PulsarService pulsar) {
+ this.pulsar = pulsar;
+ this.txnStore = new TxnMetadataStore(pulsar.getLocalMetadataStore());
+ }
+
+ // ---- TC client connect ------------------------------------------------
+
+ /**
+ * Verify this broker is the leader for {@code tcId} (owns the corresponding partition of
+ * {@code transaction_coordinator_assign}). Mirrors the ownership check the legacy
+ * {@code TransactionMetadataStoreService.handleTcClientConnect} performs — the same
+ * topic-ownership mechanism serves as our leader-election surface.
+ */
+ public CompletableFuture handleClientConnect(TransactionCoordinatorID tcId) {
+ String assignPartition = SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN
+ .getPartition((int) tcId.getId()).toString();
+ return pulsar.getBrokerService().checkTopicNsOwnership(assignPartition);
+ }
+
+ // ---- newTransaction ---------------------------------------------------
+
+ /**
+ * Create a new transaction header at {@code /txn/id/_}. The {@code leastSigBits}
+ * is drawn from the per-tcId monotonic sequence counter ({@link TxnMetadataStore#nextTxnSequence})
+ * so txnIds are never reused — the participant-side aborted-set is keyed by txnId, and reuse
+ * would break that.
+ */
+ public CompletableFuture newTransaction(TransactionCoordinatorID tcId, long timeoutInMillis,
+ String owner) {
+ return txnStore.nextTxnSequence(tcId.getId()).thenCompose(seq -> {
+ TxnID txnId = new TxnID(tcId.getId(), seq);
+ TxnHeader header = new TxnHeader(TxnState.OPEN,
+ Duration.ofMillis(timeoutInMillis), Instant.now(), null);
+ return txnStore.createHeader(TxnIds.toKey(txnId), header).thenApply(stat -> txnId);
+ });
+ }
+
+ // ---- addPartition / addSubscription (no-op in v5) ---------------------
+
+ /**
+ * No-op per PIP-473 — in v5, participants advertise themselves by writing {@code /txn/op}
+ * records when they actually apply ops. The pre-registration step is unnecessary.
+ */
+ public CompletableFuture addProducedPartitionToTxn(TxnID txnId, List partitions) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ /** No-op (see {@link #addProducedPartitionToTxn}). */
+ public CompletableFuture addAckedSubscriptionToTxn(TxnID txnId,
+ List subscriptions) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ // ---- endTransaction ---------------------------------------------------
+
+ /**
+ * Finalise a transaction: CAS the header to {@code COMMITTED}/{@code ABORTED}, enumerate
+ * the txn's participants via {@link TxnMetadataStore#listOpsByTxn}, and publish one
+ * segment-event per affected segment and one subscription-event per affected
+ * {@code (segment, subscription)} pair. Idempotent against retries — a header already in
+ * the requested terminal state short-circuits without republishing.
+ */
+ public CompletableFuture endTransaction(TxnID txnId, int txnAction) {
+ TxnState newState = newStateFor(txnAction);
+ if (newState == null) {
+ return FutureUtil.failedFuture(
+ new TransactionCoordinatorException.UnsupportedTxnActionException(txnId, txnAction));
+ }
+ String txnIdKey = TxnIds.toKey(txnId);
+ return txnStore.getHeader(txnIdKey).thenCompose(opt -> {
+ if (opt.isEmpty()) {
+ return FutureUtil.failedFuture(
+ new CoordinatorException.TransactionNotFoundException(
+ "Transaction not found: " + txnId));
+ }
+ Versioned v = opt.get();
+ TxnHeader current = v.value();
+ if (current.getState() == newState) {
+ // Idempotent retry — already in the requested terminal state. Re-publish events
+ // is safe but skip for simplicity; participants tolerate missing events via the
+ // header re-read in their reconcile path.
+ return CompletableFuture.completedFuture(null);
+ }
+ if (current.getState() != TxnState.OPEN) {
+ return FutureUtil.failedFuture(
+ new CoordinatorException.InvalidTxnStatusException(
+ "Transaction " + txnId + " is " + current.getState()
+ + ", cannot transition to " + newState));
+ }
+ TxnHeader updated = new TxnHeader(newState, current.getTimeout(),
+ current.getCreatedAt(), Instant.now());
+ return txnStore.updateHeader(txnIdKey, updated, v.version())
+ .thenCompose(stat -> fanOutEvents(txnId, txnIdKey, newState));
+ });
+ }
+
+ private static TxnState newStateFor(int txnAction) {
+ if (txnAction == TxnAction.COMMIT_VALUE) {
+ return TxnState.COMMITTED;
+ } else if (txnAction == TxnAction.ABORT_VALUE) {
+ return TxnState.ABORTED;
+ }
+ return null;
+ }
+
+ /**
+ * Enumerate {@code /txn/op} via {@link TxnMetadataStore#listOpsByTxn}, group by participant,
+ * and publish one event per participant. Writes are independent so we fire them in parallel.
+ */
+ private CompletableFuture fanOutEvents(TxnID txnId, String txnIdKey, TxnState decision) {
+ Set writeSegments = ConcurrentHashSet.create();
+ Set ackParticipants = ConcurrentHashSet.create();
+ return txnStore.listOpsByTxn(txnIdKey, new ScanConsumer() {
+ @Override
+ public void onNext(GetResult r) {
+ TxnOp op = TxnMetadataStore.fromJson(r.getValue(), TxnOp.class);
+ if (op.getKind() == TxnOpKind.WRITE) {
+ writeSegments.add(op.getSegment());
+ } else if (op.getKind() == TxnOpKind.ACK && op.getSubscription() != null) {
+ ackParticipants.add(op.getSegment() + "\0" + op.getSubscription());
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ log.warn().attr("txnId", txnId).exception(throwable)
+ .log("endTxn participant enumeration encountered an error");
+ }
+
+ @Override
+ public void onCompleted() {
+ }
+ }).thenCompose(__ -> {
+ TxnEvent event = new TxnEvent(txnIdKey, decision);
+ CompletableFuture>[] publishes = new CompletableFuture>[
+ writeSegments.size() + ackParticipants.size()];
+ int i = 0;
+ for (String segment : writeSegments) {
+ publishes[i++] = txnStore.publishSegmentEvent(segment, event);
+ }
+ for (String packed : ackParticipants) {
+ int nul = packed.indexOf('\0');
+ String segment = packed.substring(0, nul);
+ String subscription = packed.substring(nul + 1);
+ publishes[i++] = txnStore.publishSubscriptionEvent(segment, subscription, event);
+ }
+ return CompletableFuture.allOf(publishes);
+ });
+ }
+
+ // ---- Small helper instead of pulling in a third-party concurrent set --------------------
+
+ private static final class ConcurrentHashSet {
+ private static Set create() {
+ return java.util.Collections.newSetFromMap(new java.util.concurrent.ConcurrentHashMap<>());
+ }
+ }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/coordinator/v5/package-info.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/coordinator/v5/package-info.java
new file mode 100644
index 0000000000000..77f0b4c516442
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/coordinator/v5/package-info.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * PIP-473 v5 transaction coordinator: broker-side service that serves {@code NEW_TXN} /
+ * {@code END_TXN} wire commands against the metadata-store-backed
+ * {@link org.apache.pulsar.broker.transaction.metadata.TxnMetadataStore}. Replaces the
+ * legacy {@code TransactionMetadataStoreService} when
+ * {@code transactionCoordinatorScalableTopicsEnabled} is on.
+ */
+package org.apache.pulsar.broker.transaction.coordinator.v5;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TcSequence.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TcSequence.java
new file mode 100644
index 0000000000000..f3fb1ea159fcb
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TcSequence.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.metadata;
+
+/**
+ * Per-tcId txnId-sequence counter. Stored at {@code /txn/tc-seq/} and CAS-incremented
+ * by the v5 TC for each {@code newTxn}. The yielded value becomes a transaction's
+ * {@code leastSigBits} — monotonic per tcId so {@link org.apache.pulsar.client.api.transaction.TxnID}
+ * is never reused (avoiding aborted-set key collisions in the participant-side visibility state).
+ *
+ * @param next the next {@code leastSigBits} to assign (the most recently issued value + 1)
+ */
+public record TcSequence(long next) {
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
index 323473ac8b1b6..ebf300b772522 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
@@ -27,6 +27,7 @@
import java.util.concurrent.CompletionException;
import java.util.function.Consumer;
import lombok.CustomLog;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;
@@ -118,13 +119,17 @@ private static Set