diff --git a/TD-xxx-try-election-with-applied b/TD-xxx-try-election-with-applied new file mode 100644 index 000000000000..6c46e49bb5b9 --- /dev/null +++ b/TD-xxx-try-election-with-applied @@ -0,0 +1 @@ +改进目前raft的选举实现,加入对appliedIndex的判断,appliedIndex大的更优先被选举为leader。不要替我创建git分支。 \ No newline at end of file diff --git a/docs/001-raft-appliedindex-election/checklists/requirements.md b/docs/001-raft-appliedindex-election/checklists/requirements.md new file mode 100644 index 000000000000..024af5d1005f --- /dev/null +++ b/docs/001-raft-appliedindex-election/checklists/requirements.md @@ -0,0 +1,36 @@ +# Specification Quality Checklist: Raft Election with Applied Progress Priority + +**Purpose**: Validate specification completeness and quality before proceeding to planning +**Created**: 2026-04-10 +**Feature**: [spec.md](../spec.md) + +## Content Quality + +- [x] No implementation details (languages, frameworks, APIs) +- [x] Focused on user value and business needs +- [x] Written for non-technical stakeholders +- [x] All mandatory sections completed + +## Requirement Completeness + +- [x] No [NEEDS CLARIFICATION] markers remain +- [x] Requirements are testable and unambiguous +- [x] Success criteria are measurable +- [x] Success criteria are technology-agnostic (no implementation details) +- [x] All acceptance scenarios are defined +- [x] Edge cases are identified +- [x] Scope is clearly bounded +- [x] Dependencies and assumptions identified + +## Feature Readiness + +- [x] All functional requirements have clear acceptance criteria +- [x] User scenarios cover primary flows +- [x] Feature meets measurable outcomes defined in Success Criteria +- [x] No implementation details leak into specification + +## Notes + +- Validation iteration 1: all checklist items passed. +- No clarification questions required. +- Branch creation step was intentionally skipped per user instruction; artifacts were created directly under the sequential feature directory. diff --git a/docs/001-raft-appliedindex-election/contracts/request-vote-applied-index.md b/docs/001-raft-appliedindex-election/contracts/request-vote-applied-index.md new file mode 100644 index 000000000000..a4f461af3017 --- /dev/null +++ b/docs/001-raft-appliedindex-election/contracts/request-vote-applied-index.md @@ -0,0 +1,43 @@ +# Contract: RequestVote Applied-Index Preference + +## Interface + +- Interface type: Internal node-to-node sync RPC contract +- Producer: `syncNodeRequestVotePeers` +- Consumer: `syncNodeOnRequestVote` +- Message: `SyncRequestVote` + +## Request Contract + +Required fields for vote evaluation: + +| Field | Meaning | Requirement | +|------|---------|-------------| +| `term` | Candidate term | Must equal sender current term | +| `lastLogTerm` | Candidate last log term | Must be populated from local sync log state | +| `lastLogIndex` | Candidate last log index | Must be populated from local sync log state | +| `candidateAppliedIndex` | Candidate applied progress snapshot | Must be populated from `FpAppliedIndexCb` at request-build time | + +## Vote Evaluation Rules + +1. Reject requests from nodes not in the raft group. +2. Update local term and step down if the request term is higher, preserving existing behavior. +3. Apply the existing log-recency gate based on `lastLogTerm` and `lastLogIndex`. +4. If the candidate fails the log-recency gate, reject the vote regardless of applied progress. +5. If the candidate passes the log-recency gate, compare `candidateAppliedIndex` with the receiver's local applied index. +6. Prefer granting votes to the candidate with greater or equal applied progress. +7. If either side reports an unavailable applied index, treat applied progress as unavailable for ordering and fall back to existing deterministic behavior. +8. If applied progress is equal, fall back to existing deterministic behavior. +9. Reset the election timer only when the vote is actually granted, preserving current liveness behavior. + +## Observability Contract + +- Vote-decision logs must include both log-recency inputs and applied-progress inputs. +- Logs for unequal applied indexes must reveal whether the applied-progress comparison influenced the grant result. +- Election outcome logs must allow operators to explain why the elected node was preferred. + +## Compatibility Assumption + +- This contract change assumes homogeneous cluster binaries for the feature rollout. +- Mixed-version compatibility is not included in this feature plan. +- Operational rollout guidance MUST document same-version deployment as a prerequisite. \ No newline at end of file diff --git a/docs/001-raft-appliedindex-election/data-model.md b/docs/001-raft-appliedindex-election/data-model.md new file mode 100644 index 000000000000..9fbcafef120f --- /dev/null +++ b/docs/001-raft-appliedindex-election/data-model.md @@ -0,0 +1,61 @@ +# Data Model: Raft Election with Applied Progress Priority + +## Candidate Vote Request + +- Purpose: Internal request-vote message emitted by a candidate to each voting peer during an election round. +- Fields: + - `srcId`: Candidate raft identity. + - `destId`: Target voter identity. + - `term`: Candidate term for the election round. + - `lastLogIndex`: Candidate last log index used by the existing recency gate. + - `lastLogTerm`: Candidate last log term used by the existing recency gate. + - `candidateAppliedIndex`: Candidate applied progress snapshot used for preference ordering. +- Validation rules: + - `term` must equal the candidate's current persisted term at send time. + - `candidateAppliedIndex` must be derived from the FSM callback at the time the request is built. + - Message consumers must tolerate equal applied-index values and preserve deterministic tie fallback. + +## Local Applied Progress Snapshot + +- Purpose: The receiving node's current view of its own applied progress at vote-evaluation time. +- Fields: + - `localAppliedIndex`: Value returned by `FpAppliedIndexCb`. + - `localLastLogIndex`: Last local log index. + - `localLastLogTerm`: Last local log term. +- Validation rules: + - `localLastLogTerm` and `localLastLogIndex` remain the first-stage election safety check. + - `localAppliedIndex` is compared only after the candidate satisfies the safety gate. + +## Election Decision Record + +- Purpose: Structured evidence for why a vote was granted or rejected and why a leader was elected. +- Fields: + - `term` + - `candidateId` + - `voterId` + - `candidateLastLogTerm` + - `candidateLastLogIndex` + - `candidateAppliedIndex` + - `localLastLogTerm` + - `localLastLogIndex` + - `localAppliedIndex` + - `grantDecision` + - `decisionReason` +- Validation rules: + - Every unequal-applied-index vote decision must be explainable from logged decision factors. + - Logging must not remove or obscure the existing recency decision context. + +## Election Round + +- Purpose: One leader-election attempt for a vgroup term. +- Fields: + - `term` + - `candidateSet` + - `voteResponses` + - `winner` + - `winnerAppliedIndex` + - `tieFallbackUsed` +- State transitions: + - `Follower -> Candidate`: election starts and self-vote recorded. + - `Candidate -> Leader`: quorum obtained, possibly influenced by applied-progress preference. + - `Candidate/Follower -> Follower`: step-down on higher term or vote for another candidate. \ No newline at end of file diff --git a/docs/001-raft-appliedindex-election/plan.md b/docs/001-raft-appliedindex-election/plan.md new file mode 100644 index 000000000000..75ab01203106 --- /dev/null +++ b/docs/001-raft-appliedindex-election/plan.md @@ -0,0 +1,87 @@ +# Implementation Plan: Raft Election with Applied Progress Priority + +**Branch**: `[001-raft-appliedindex-election]` | **Date**: 2026-04-10 | **Spec**: `/root/github/taosdata/TDinternal/specs/001-raft-appliedindex-election/spec.md` +**Input**: Feature specification from `/specs/001-raft-appliedindex-election/spec.md` + +**Note**: Planning was bound to `SPECIFY_FEATURE=001-raft-appliedindex-election` because the user explicitly requested no branch creation. + +## Summary + +Improve TDinternal's Raft leader election so that eligible candidates with higher applied progress are preferred, while preserving existing last-log-term/last-log-index safety checks and election liveness. The implementation will extend the internal `SyncRequestVote` contract to carry the candidate's applied index, incorporate an applied-progress comparison into vote granting after existing log recency validation, and add observability plus unit/integration coverage for unequal-progress, tie, and lagging-node election scenarios. + +## Technical Context + +**Language/Version**: C for sync runtime, C++ for existing Google Test coverage +**Primary Dependencies**: TDinternal sync library, internal RPC message layer, WAL/log store, FSM callbacks, Google Test +**Storage**: Raft store metadata, WAL-backed log store, FSM-reported applied progress +**Testing**: Existing `community/source/libs/sync/test` Google Test executables plus targeted cluster-style sync tests +**Target Platform**: Linux server nodes running TDinternal sync/raft services +**Project Type**: Native distributed systems library +**Performance Goals**: Keep vote evaluation O(1), add no extra election round-trips, and avoid measurable regression in election completion under current replica counts +**Constraints**: Preserve Raft log safety, preserve deterministic behavior on ties, keep elections live when applied progress is unavailable/equal, and treat mixed-version wire compatibility as out of scope unless separately planned +**Scale/Scope**: Per-vgroup replica elections across small quorum-based replica sets with optional learners; change scope is limited to sync election logic, internal message schema, and sync tests + +## Constitution Check + +*GATE: Must pass before Phase 0 research. Re-check after Phase 1 design.* + +- Constitution file review result: `.specify/memory/constitution.md` still contains template placeholders and no ratified enforceable principles. +- Gate status before research: PASS by absence of enforceable constitutional constraints. +- Required caution despite missing constitution rules: preserve existing Raft safety invariants, keep plan scoped to sync election paths, and do not assume branch creation is available. +- Post-design re-check: PASS. The design stays within the sync library boundary, preserves current safety gating on log recency, and adds only one internal message-contract change plus corresponding tests. + +## Project Structure + +### Documentation (this feature) + +```text +specs/001-raft-appliedindex-election/ +├── plan.md +├── research.md +├── data-model.md +├── quickstart.md +├── contracts/ +│ └── request-vote-applied-index.md +└── tasks.md +``` + +### Source Code (repository root) + +```text +community/source/libs/sync/ +├── inc/ +│ ├── syncElection.h +│ ├── syncInt.h +│ ├── syncMessage.h +│ ├── syncRequestVote.h +│ └── syncUtil.h +├── src/ +│ ├── syncElection.c +│ ├── syncMain.c +│ ├── syncMessage.c +│ ├── syncRequestVote.c +│ └── syncUtil.c +└── test/ + ├── syncElectTest.cpp + ├── syncRequestVoteTest.cpp + ├── syncVotesGrantedTest.cpp + └── sync_test_lib/ + +community/tests/ +└── pytest/ + └── cluster/ + └── syncingTest.py + +community/contrib/test/traft/ +├── cluster/ +├── join_into_vgroup/ +└── single_node/ +``` + +**Structure Decision**: Keep all runtime changes inside `community/source/libs/sync/{inc,src}` and validate behavior primarily through `community/source/libs/sync/test`. Use existing cluster-style test scaffolding under `community/contrib/test/traft` or `community/tests/pytest/cluster` only if unit-level coverage cannot express a required election scenario. + +## Complexity Tracking + +| Violation | Why Needed | Simpler Alternative Rejected Because | +|-----------|------------|-------------------------------------| +| Internal wire contract change | `SyncRequestVote` must carry candidate applied progress so remote voters can compare candidates using data they do not currently receive | Deriving candidate applied progress locally is impossible with current message payloads | diff --git a/docs/001-raft-appliedindex-election/quickstart.md b/docs/001-raft-appliedindex-election/quickstart.md new file mode 100644 index 000000000000..e19fda5a1b5c --- /dev/null +++ b/docs/001-raft-appliedindex-election/quickstart.md @@ -0,0 +1,51 @@ +# Quickstart: Raft Election with Applied Progress Priority + +## 1. Build targeted sync tests + +From the repository root: + +```sh +cmake --build build --target syncRequestVoteTest syncVotesGrantedTest syncElectTest +``` + +## 2. Implement the runtime change + +Update the sync runtime in these areas: + +- Add the applied-index field to `SyncRequestVote` in `community/source/libs/sync/inc/syncMessage.h`. +- Populate the field in `community/source/libs/sync/src/syncElection.c` when building vote requests. +- Read local applied progress from `FpAppliedIndexCb` in `community/source/libs/sync/src/syncRequestVote.c` and incorporate it after the existing log-recency gate. +- Extend request-vote logging in `community/source/libs/sync/src/syncUtil.c` or adjacent sync logging paths so operators can see the comparison inputs. + +## 3. Add focused test coverage + +Recommended first-pass coverage: + +- `syncRequestVoteTest`: grant and reject decisions for higher, lower, equal, and unavailable applied-index cases. +- `syncVotesGrantedTest` or adjacent helpers: ensure tie handling remains deterministic. +- `syncElectTest`: verify a higher-applied candidate wins once all candidates are otherwise log-eligible. + +## 4. Run targeted tests + +Run the built binaries from the build output directory, for example: + +```sh +./build/community/source/libs/sync/test/syncRequestVoteTest +./build/community/source/libs/sync/test/syncVotesGrantedTest +./build/community/source/libs/sync/test/syncElectTest +``` + +## 5. Capture failover baseline and comparison + +- Record a baseline failover drill before enabling the new election preference. +- Re-run the same drill after the change using the same replica topology and workload. +- Compare median leader recovery completion time and keep the before/after timestamps with the test logs. +- If using `community/tests/pytest/cluster/syncingTest.py`, record total elapsed time around the relevant replica and schema-change sequence. + +## 6. Verify observability and rollout assumptions + +- Trigger an election scenario with unequal applied progress. +- Confirm logs include candidate/local applied index, last-log term/index, and the grant reason. +- Confirm tie scenarios still elect a single leader without repeated deadlock. +- Confirm unavailable applied index falls back to the existing deterministic path instead of blocking votes. +- Roll out only to same-version clusters because the RequestVote wire contract has changed. \ No newline at end of file diff --git a/docs/001-raft-appliedindex-election/research.md b/docs/001-raft-appliedindex-election/research.md new file mode 100644 index 000000000000..aa61ec4048d3 --- /dev/null +++ b/docs/001-raft-appliedindex-election/research.md @@ -0,0 +1,37 @@ +# Research: Raft Election with Applied Progress Priority + +## Decision 1: Keep last-log-term and last-log-index as the primary safety gate + +- Decision: Preserve the current `lastLogTerm` and `lastLogIndex` vote eligibility logic as the first-stage gate, and only evaluate applied progress after the candidate passes that existing Raft recency check. +- Rationale: The current implementation in `syncRequestVote.c` enforces Raft-style log freshness. Replacing that check with applied progress would risk violating election safety and diverging from established sync behavior. +- Alternatives considered: Use applied progress as the sole election criterion. Rejected because applied progress does not encode log freshness or term ordering and would weaken the existing safety guard. + +## Decision 2: Extend `SyncRequestVote` with candidate applied index + +- Decision: Add a candidate applied-index field to the internal `SyncRequestVote` message and populate it during `syncNodeRequestVotePeers`. +- Rationale: Remote voters cannot prefer the most up-to-date candidate unless the candidate's applied progress is transmitted with the vote request. The existing contract only carries term and log recency metadata. +- Alternatives considered: Infer candidate applied progress from heartbeats or cached peer state. Rejected because cached progress may be stale or absent exactly when an election is needed. + +## Decision 3: Source applied progress from the FSM callback already exposed by sync + +- Decision: Use `SSyncFSM::FpAppliedIndexCb` as the authoritative provider for local applied progress during vote request construction and vote evaluation. +- Rationale: The sync layer already invokes this callback for observability in `syncUtil.c`, which makes it the least invasive and most consistent source of applied progress. +- Alternatives considered: Introduce a new state field on `SSyncNode` or reuse `commitIndex`. Rejected because that duplicates existing data flow or fails to represent actually applied progress. + +## Decision 4: Define applied progress as a deterministic preference factor, not a liveness blocker + +- Decision: Use applied progress to prefer the candidate with the highest applied index when log recency is acceptable, but retain deterministic fallback behavior when applied indexes are equal or unavailable. +- Rationale: The feature request requires higher applied progress to be preferred, while the specification also requires elections to remain live under ties and degraded conditions. +- Alternatives considered: Refuse all votes for candidates with missing or lower applied index. Rejected because it could deadlock elections during partial visibility or uniform lag. + +## Decision 5: Treat mixed-version message compatibility as out of scope for this feature slice + +- Decision: Plan this feature as a homogeneous-version cluster change that updates both message producer and consumer together. +- Rationale: `SyncRequestVote` is a raw internal message contract with fixed-size allocation in `syncMessage.c`; adding fields changes the on-wire layout. Supporting mixed-version clusters would require a broader version-negotiation or dual-decoding design not requested here. +- Alternatives considered: Add backward-compatible dual-format decoding in the same feature. Rejected because it expands scope materially beyond election preference behavior. + +## Decision 6: Validate behavior primarily with sync unit tests, plus optional cluster-style follow-up coverage + +- Decision: Add or update tests in `community/source/libs/sync/test` for message contract coverage, vote-grant decision logic, and election outcomes, with cluster-style tests only if a unit harness cannot express a scenario. +- Rationale: The sync test suite already contains focused executables for request-vote, vote-manager, and election behavior, which is the most efficient place to pin regressions. +- Alternatives considered: Rely solely on manual `traft` cluster programs. Rejected because they are slower and less precise for guarding edge cases like ties and unavailable applied progress. \ No newline at end of file diff --git a/docs/001-raft-appliedindex-election/spec.md b/docs/001-raft-appliedindex-election/spec.md new file mode 100644 index 000000000000..52d8d8e74f06 --- /dev/null +++ b/docs/001-raft-appliedindex-election/spec.md @@ -0,0 +1,94 @@ +# Feature Specification: Raft Election with Applied Progress Priority + +**Feature Branch**: `[001-raft-appliedindex-election]` +**Created**: 2026-04-10 +**Status**: Draft +**Input**: User description: "改进目前raft的选举实现,加入对appliedIndex的判断,appliedIndex大的更优先被选举为leader。不要替我创建git分支。" + +## User Scenarios & Testing *(mandatory)* + +### User Story 1 - Prefer Most Up-to-Date Candidate (Priority: P1) + +As a cluster operator, I want leader election to prefer candidates with higher applied progress so that the elected leader more likely has the most complete and up-to-date state. + +**Why this priority**: Leader selection quality directly affects correctness, failover stability, and post-election recovery time. + +**Independent Test**: Can be fully tested by triggering elections among candidates with different applied progress values and confirming the winning candidate is the one with the highest eligible applied progress. + +**Acceptance Scenarios**: + +1. **Given** multiple eligible candidates in the same election round, **When** they have different applied progress values, **Then** the cluster elects the candidate with the highest applied progress. +2. **Given** an election where one candidate has clearly lower applied progress than another eligible candidate, **When** votes are cast, **Then** the lower-progress candidate does not win if a higher-progress candidate is available. + +--- + +### User Story 2 - Preserve Election Continuity Under Ties (Priority: P2) + +As a cluster operator, I want election behavior to remain deterministic when candidates have equal applied progress so that elections can complete reliably without introducing ambiguity. + +**Why this priority**: Tied applied progress is common during healthy replication; election completion must remain predictable. + +**Independent Test**: Can be fully tested by creating tie conditions where candidates report equal applied progress and verifying election still converges with a consistent tie-breaking outcome. + +**Acceptance Scenarios**: + +1. **Given** two or more eligible candidates with equal applied progress, **When** an election occurs, **Then** exactly one candidate is elected according to existing deterministic tie-breaking behavior. +2. **Given** repeated elections under the same tie inputs, **When** elections are retried, **Then** outcomes remain deterministic and no persistent election deadlock occurs. + +--- + +### User Story 3 - Prevent Regressive Leader Selection (Priority: P3) + +As an operator responsible for data reliability, I want candidates with meaningfully behind applied progress to be deprioritized so that leadership does not regress to stale state holders. + +**Why this priority**: Preventing stale leaders reduces risk of extra catch-up work and delayed service recovery after failover. + +**Independent Test**: Can be fully tested by simulating lagging nodes during election and confirming these nodes lose leadership preference to more up-to-date eligible peers. + +**Acceptance Scenarios**: + +1. **Given** a lagging candidate and a non-lagging candidate are both otherwise eligible, **When** election starts, **Then** the non-lagging candidate receives leadership preference. +2. **Given** only lagging candidates remain available, **When** election must proceed, **Then** election still completes and cluster availability is maintained. + +### Edge Cases + +- What happens when all candidates report identical applied progress values across the election set? +- How does the system handle elections when applied progress metadata is temporarily unavailable or delayed for a candidate? +- What happens if a candidate’s applied progress changes during an in-flight election round? +- How does the system behave when only one candidate is reachable and it has lower applied progress than previously known leaders? + +## Requirements *(mandatory)* + +### Functional Requirements + +- **FR-001**: The system MUST include candidate applied progress as an explicit election preference factor during leader selection. +- **FR-002**: The system MUST prefer the eligible candidate with the highest applied progress in the same election decision context. +- **FR-003**: The system MUST keep election behavior deterministic when two or more eligible candidates have equal applied progress. +- **FR-004**: The system MUST preserve election liveness, ensuring elections can still complete when applied progress cannot produce a strict ordering. +- **FR-005**: The system MUST avoid selecting a lower-progress candidate when a higher-progress eligible candidate is available in the same election round. +- **FR-006**: The system MUST record election outcomes with enough observability data to verify that applied progress preference was respected. +- **FR-007**: Operators MUST be able to validate, through logs or metrics, why a specific candidate was elected when applied progress values differ. +- **FR-008**: The system MUST maintain backward-compatible operational behavior for normal election execution, except for the intended change in leader preference. + +### Key Entities *(include if feature involves data)* + +- **Candidate Node**: A node eligible to become leader in an election round, including its election identity and applied progress value. +- **Election Round**: A single leader-selection attempt among reachable eligible candidates, with a final winner and supporting decision context. +- **Applied Progress Snapshot**: The progress value used for each candidate at election decision time. +- **Election Decision Record**: Persisted observability artifact showing winner, competing candidates, and decision-relevant factors. + +## Success Criteria *(mandatory)* + +### Measurable Outcomes + +- **SC-001**: In controlled election tests with unequal candidate progress, at least 95% of elections choose the highest-progress eligible candidate. +- **SC-002**: In tie-progress election tests, 100% of election rounds complete with a single elected leader and no persistent deadlock. +- **SC-003**: During failover drills, median leader recovery completion time improves by at least 15% compared with baseline behavior. +- **SC-004**: In post-release operational audits, 100% of sampled elections with unequal candidate progress provide observable evidence that the elected node had the highest eligible progress. + +## Assumptions + +- Existing eligibility rules for participating in elections remain in effect; this feature adds a preference dimension rather than replacing foundational election safety checks. +- Current deterministic tie-breaking behavior remains the fallback when applied progress values are equal. +- Election observability (logs/metrics) can be extended within existing operational monitoring practices. +- The request explicitly requires no automatic branch creation in this workflow; specification artifacts are created on the current branch only. diff --git a/docs/001-raft-appliedindex-election/tasks.md b/docs/001-raft-appliedindex-election/tasks.md new file mode 100644 index 000000000000..533328bfd6e7 --- /dev/null +++ b/docs/001-raft-appliedindex-election/tasks.md @@ -0,0 +1,198 @@ +# Tasks: Raft Election with Applied Progress Priority + +**Input**: Design documents from `/specs/001-raft-appliedindex-election/` +**Prerequisites**: plan.md, spec.md, research.md, data-model.md, contracts/request-vote-applied-index.md, quickstart.md + +**Tests**: Add focused Google Test and cluster-regression coverage because the plan explicitly requires validation for serialization, vote-grant decisions, tie handling, and election outcomes. + +**Organization**: Tasks are grouped by user story so each behavior change can be implemented and validated independently. + +## Phase 1: Setup (Shared Infrastructure) + +**Purpose**: Prepare shared test scaffolding for the extended RequestVote contract + +- [X] T001 Update shared RequestVote helper declarations for candidateAppliedIndex in community/source/libs/sync/test/sync_test_lib/inc/syncTest.h +- [X] T002 [P] Extend shared RequestVote debug and JSON serialization for candidateAppliedIndex in community/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c + +--- + +## Phase 2: Foundational (Blocking Prerequisites) + +**Purpose**: Add the common message-contract and runtime plumbing required by every user story + +**⚠️ CRITICAL**: No user story work can begin until this phase is complete + +- [X] T003 Add candidateAppliedIndex to SyncRequestVote in community/source/libs/sync/inc/syncMessage.h +- [X] T004 Update RequestVote allocation, serialization, and RPC conversion for candidateAppliedIndex in community/source/libs/sync/src/syncMessage.c +- [X] T005 [P] Add syncNodeGetAppliedIndex declaration and implementation in community/source/libs/sync/inc/syncInt.h and community/source/libs/sync/src/syncMain.c +- [X] T006 [P] Extend request-vote logging interfaces for applied-index-aware decision traces in community/source/libs/sync/inc/syncUtil.h and community/source/libs/sync/src/syncUtil.c + +**Checkpoint**: The applied-index field is available in the internal vote contract, can be encoded/decoded in tests, and can be queried/logged by sync runtime code. + +--- + +## Phase 3: User Story 1 - Prefer Most Up-to-Date Candidate (Priority: P1) 🎯 MVP + +**Goal**: Prefer the eligible candidate with the highest applied progress after existing log-recency checks pass. + +**Independent Test**: Trigger elections among otherwise eligible candidates with different applied indexes and verify the highest applied-index candidate wins. + +### Tests for User Story 1 + +- [X] T007 [P] [US1] Extend RequestVote round-trip coverage for candidateAppliedIndex in community/source/libs/sync/test/syncRequestVoteTest.cpp +- [X] T008 [P] [US1] Add unequal-applied-index leader selection coverage in community/source/libs/sync/test/syncElectTest.cpp + +### Implementation for User Story 1 + +- [X] T009 [US1] Populate candidateAppliedIndex in outgoing vote requests in community/source/libs/sync/src/syncElection.c +- [X] T010 [US1] Grant votes to higher-applied eligible candidates after log-recency validation in community/source/libs/sync/src/syncRequestVote.c + +**Checkpoint**: User Story 1 is complete when unequal applied-index candidates still satisfy current Raft safety checks and the most up-to-date eligible node wins election. + +--- + +## Phase 4: User Story 2 - Preserve Election Continuity Under Ties (Priority: P2) + +**Goal**: Keep election outcomes deterministic and live when eligible candidates have equal applied progress. + +**Independent Test**: Re-run equal-applied-index election scenarios and verify a single leader is elected without repeated deadlock. + +### Tests for User Story 2 + +- [X] T011 [P] [US2] Add equal-applied-index vote-decision coverage in community/source/libs/sync/test/syncRequestVoteTest.cpp +- [X] T012 [P] [US2] Add deterministic repeated-tie election coverage in community/source/libs/sync/test/syncVotesGrantedTest.cpp + +### Implementation for User Story 2 + +- [X] T013 [US2] Preserve deterministic tie fallback for equal applied indexes in community/source/libs/sync/src/syncRequestVote.c + +**Checkpoint**: User Story 2 is complete when equal applied-index candidates still converge on one leader through the existing deterministic fallback path. + +--- + +## Phase 5: User Story 3 - Prevent Regressive Leader Selection (Priority: P3) + +**Goal**: Deprioritize lagging candidates without blocking election progress when applied-index ordering is weak or unavailable. + +**Independent Test**: Simulate lagging and missing-applied-index candidates, then confirm stale nodes lose preference when a better candidate exists and elections still complete under degraded inputs. + +### Tests for User Story 3 + +- [X] T014 [P] [US3] Add lower-applied and unavailable-applied RequestVote coverage in community/source/libs/sync/test/syncRequestVoteTest.cpp +- [X] T015 [P] [US3] Add lagging-node preference regression coverage in community/tests/pytest/cluster/syncingTest.py + +### Implementation for User Story 3 + +- [X] T016 [US3] Define and implement fallback handling for lower, equal, and unavailable applied indexes in community/source/libs/sync/src/syncRequestVote.c +- [X] T017 [US3] Emit applied-progress election decision evidence in community/source/libs/sync/src/syncRequestVote.c and community/source/libs/sync/src/syncUtil.c + +**Checkpoint**: User Story 3 is complete when lagging candidates no longer win against more up-to-date eligible peers and operators can explain the outcome from logs. + +--- + +## Phase 6: Polish & Cross-Cutting Concerns + +**Purpose**: Final validation and documentation updates that affect all stories + +- [X] T018 [P] Add failover timing baseline and comparison steps in specs/001-raft-appliedindex-election/quickstart.md and community/tests/pytest/cluster/syncingTest.py +- [X] T019 [P] Add election audit verification steps and same-version rollout notes in specs/001-raft-appliedindex-election/quickstart.md and specs/001-raft-appliedindex-election/contracts/request-vote-applied-index.md +- [X] T020 Run the validation commands documented in specs/001-raft-appliedindex-election/quickstart.md against build/community/source/libs/sync/test/syncRequestVoteTest, build/community/source/libs/sync/test/syncVotesGrantedTest, and build/community/source/libs/sync/test/syncElectTest + +--- + +## Dependencies & Execution Order + +### Phase Dependencies + +- **Setup (Phase 1)**: No dependencies; can start immediately. +- **Foundational (Phase 2)**: Depends on Setup; blocks all user-story work. +- **User Story 1 (Phase 3)**: Depends on Foundational; establishes the MVP behavior change. +- **User Story 2 (Phase 4)**: Depends on Foundational and should land after User Story 1 because it refines the same vote-decision path. +- **User Story 3 (Phase 5)**: Depends on Foundational and should land after User Story 1 because it extends the same applied-index comparison path. +- **Polish (Phase 6)**: Depends on all targeted user stories being complete. + +### User Story Dependencies + +- **User Story 1 (P1)**: No dependency on other user stories; this is the MVP slice. +- **User Story 2 (P2)**: Uses the same RequestVote comparison path as User Story 1 but remains independently testable through equal-index scenarios. +- **User Story 3 (P3)**: Uses the same RequestVote comparison path as User Story 1 but remains independently testable through lagging and unavailable-index scenarios. + +### Within Each User Story + +- Tests must fail before the corresponding implementation task is considered complete. +- Message or helper contract updates must land before vote-logic changes that consume them. +- Runtime vote-decision changes must land before observability or cluster-regression verification. + +### Recommended Story Completion Order + +- **US1 → US2 → US3** + +--- + +## Parallel Opportunities + +- `T001` and `T002` can proceed independently because they touch separate test helper files. +- `T005` and `T006` can proceed in parallel after `T003` because runtime applied-index accessors and logging interfaces are independent. +- `T007` and `T008` can proceed in parallel for User Story 1. +- `T011` and `T012` can proceed in parallel for User Story 2. +- `T014` and `T015` can proceed in parallel for User Story 3. + +--- + +## Parallel Example: User Story 1 + +```bash +# Launch both User Story 1 tests in parallel: +Task: "Extend RequestVote round-trip coverage for candidateAppliedIndex in community/source/libs/sync/test/syncRequestVoteTest.cpp" +Task: "Add unequal-applied-index leader selection coverage in community/source/libs/sync/test/syncElectTest.cpp" +``` + +## Parallel Example: User Story 2 + +```bash +# Launch both User Story 2 tests in parallel: +Task: "Add equal-applied-index vote-decision coverage in community/source/libs/sync/test/syncRequestVoteTest.cpp" +Task: "Add deterministic repeated-tie election coverage in community/source/libs/sync/test/syncVotesGrantedTest.cpp" +``` + +## Parallel Example: User Story 3 + +```bash +# Launch both User Story 3 regressions in parallel: +Task: "Add lower-applied and unavailable-applied RequestVote coverage in community/source/libs/sync/test/syncRequestVoteTest.cpp" +Task: "Add lagging-node preference regression coverage in community/tests/pytest/cluster/syncingTest.py" +``` + +--- + +## Implementation Strategy + +### MVP First (User Story 1 Only) + +1. Complete Phase 1: Setup. +2. Complete Phase 2: Foundational. +3. Complete Phase 3: User Story 1. +4. Validate `syncRequestVoteTest` and `syncElectTest` before moving on. + +### Incremental Delivery + +1. Land the shared RequestVote contract and runtime plumbing. +2. Deliver User Story 1 as the leader-preference MVP. +3. Add deterministic tie handling coverage and safeguards in User Story 2. +4. Add lagging-candidate regression coverage and observability in User Story 3. +5. Finish with failover baseline capture, audit guidance, quickstart validation, and documentation cleanup. + +### Parallel Team Strategy + +1. One developer handles foundational message-contract work while another prepares shared test scaffolding. +2. After Foundational is complete, tests for each user story can be prepared in parallel before runtime changes land. +3. Cluster regression work in `community/tests/pytest/cluster/syncingTest.py` can proceed alongside runtime observability changes. + +--- + +## Notes + +- [P] tasks touch different files and can be worked in parallel. +- All user-story tasks carry `[US1]`, `[US2]`, or `[US3]` for traceability. +- Every task includes exact file paths so an implementation agent can execute it directly. +- Suggested MVP scope: Phase 1 + Phase 2 + Phase 3. \ No newline at end of file diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index fc43fe081032..1cdc30bbb5f0 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -324,6 +324,7 @@ bool syncNodeHasSnapshot(SSyncNode* pSyncNode); void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode); int32_t syncNodeStartSnapshot(SSyncNode* pSyncNode, SRaftId* pDestId, char* reason); +SyncIndex syncNodeGetAppliedIndex(const SSyncNode* pSyncNode); SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode); SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode); int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm); diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index 72a05107eab7..aeab3e4cdb40 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -70,6 +70,7 @@ typedef struct SyncRequestVote { SyncTerm term; SyncIndex lastLogIndex; SyncTerm lastLogTerm; + SyncIndex candidateAppliedIndex; int16_t reserved; } SyncRequestVote; diff --git a/source/libs/sync/inc/syncRequestVote.h b/source/libs/sync/inc/syncRequestVote.h index 7e7c5f7e16ea..74cb5640755b 100644 --- a/source/libs/sync/inc/syncRequestVote.h +++ b/source/libs/sync/inc/syncRequestVote.h @@ -44,6 +44,7 @@ extern "C" { // m) // /\ UNCHANGED <> // +bool syncNodeOnRequestVoteAppliedIndexOK(SSyncNode* pNode, const SyncRequestVote* pMsg, SyncIndex* pLocalApplied); int32_t syncNodeOnRequestVote(SSyncNode* pNode, const SRpcMsg* pMsg); #ifdef __cplusplus diff --git a/source/libs/sync/inc/syncUtil.h b/source/libs/sync/inc/syncUtil.h index 589a69770676..c181656afb86 100644 --- a/source/libs/sync/inc/syncUtil.h +++ b/source/libs/sync/inc/syncUtil.h @@ -134,6 +134,7 @@ void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMs void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s, const STraceId* trace); +// RequestVote logs include log-recency inputs and candidate applied-progress context. void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, int32_t voteGranted, const char* s, const char* opt, const STraceId* trace); void syncLogSendRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s, const STraceId* trace); diff --git a/source/libs/sync/src/syncElection.c b/source/libs/sync/src/syncElection.c index 538bb5d37d3f..e489024c7e4a 100644 --- a/source/libs/sync/src/syncElection.c +++ b/source/libs/sync/src/syncElection.c @@ -56,6 +56,7 @@ static int32_t syncNodeRequestVotePeers(SSyncNode* pNode) { pMsg->srcId = pNode->myRaftId; pMsg->destId = pNode->peersId[i]; pMsg->term = raftStoreGetTerm(pNode); + pMsg->candidateAppliedIndex = syncNodeGetAppliedIndex(pNode); ret = syncNodeGetLastIndexTerm(pNode, &pMsg->lastLogIndex, &pMsg->lastLogTerm); if (ret < 0) { @@ -65,10 +66,7 @@ static int32_t syncNodeRequestVotePeers(SSyncNode* pNode) { TRACE_SET_MSGID(&(rpcMsg.info.traceId), tGenIdPI64()); TRACE_SET_ROOTID(&(rpcMsg.info.traceId), rootId); - sInfo("vgId:%d, send request-vote msg to peerId:0x%" PRIx64 ", lastLogIndex:%" PRId64 ", lastLogTerm:%" PRId64 - ", QID:0x%" PRIx64 ":0x%" PRIx64, - pNode->vgId, pNode->peersId[i].addr, pMsg->lastLogIndex, pMsg->lastLogTerm, - (uint64_t)rpcMsg.info.traceId.rootId, (uint64_t)rpcMsg.info.traceId.msgId); + syncLogSendRequestVote(pNode, pMsg, "", &rpcMsg.info.traceId); ret = syncNodeSendMsgById(&pNode->peersId[i], pNode, &rpcMsg); if (ret < 0) { sError("vgId:%d, failed to send msg to peerId:0x%" PRIx64, pNode->vgId, pNode->peersId[i].addr); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 66e65e4988b6..e6f55d8cb37c 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -2618,6 +2618,14 @@ bool syncNodeHasSnapshot(SSyncNode* pSyncNode) { // return max(logLastIndex, snapshotLastIndex) // if no snapshot and log, return -1 +SyncIndex syncNodeGetAppliedIndex(const SSyncNode* pSyncNode) { + if (pSyncNode == NULL || pSyncNode->pFsm == NULL || pSyncNode->pFsm->FpAppliedIndexCb == NULL) { + return SYNC_INDEX_INVALID; + } + + return pSyncNode->pFsm->FpAppliedIndexCb(pSyncNode->pFsm); +} + SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode) { SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1}; if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) { diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 9ba6df0632de..78aa771bf3b4 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -96,6 +96,7 @@ int32_t syncBuildRequestVote(SRpcMsg* pMsg, int32_t vgId) { pRequestVote->bytes = bytes; pRequestVote->msgType = TDMT_SYNC_REQUEST_VOTE; pRequestVote->vgId = vgId; + pRequestVote->candidateAppliedIndex = SYNC_INDEX_INVALID; return 0; } diff --git a/source/libs/sync/src/syncRequestVote.c b/source/libs/sync/src/syncRequestVote.c index 2082c9ec006e..cbb7d31cbe14 100644 --- a/source/libs/sync/src/syncRequestVote.c +++ b/source/libs/sync/src/syncRequestVote.c @@ -86,6 +86,32 @@ static bool syncNodeOnRequestVoteLogOK(SSyncNode* ths, SyncRequestVote* pMsg) { return false; } +bool syncNodeOnRequestVoteAppliedIndexOK(SSyncNode* ths, const SyncRequestVote* pMsg, SyncIndex* pLocalApplied) { + *pLocalApplied = syncNodeGetAppliedIndex(ths); + + if (*pLocalApplied < SYNC_INDEX_BEGIN || pMsg->candidateAppliedIndex < SYNC_INDEX_BEGIN) { + sNInfo(ths, + "appliedok:1, fallback due to unavailable applied index, {my-applied:%" PRId64 ", recv-applied:%" PRId64 + ", recv-term:%" PRIu64 "}", + *pLocalApplied, pMsg->candidateAppliedIndex, pMsg->term); + return true; + } + + if (pMsg->candidateAppliedIndex < *pLocalApplied) { + sNWarn(ths, + "appliedok:0, candidate applied index behind local state, {my-applied:%" PRId64 ", recv-applied:%" PRId64 + ", recv-term:%" PRIu64 "}", + *pLocalApplied, pMsg->candidateAppliedIndex, pMsg->term); + return false; + } + + sNInfo(ths, + "appliedok:1, candidate applied index acceptable, {my-applied:%" PRId64 ", recv-applied:%" PRId64 + ", recv-term:%" PRIu64 "}", + *pLocalApplied, pMsg->candidateAppliedIndex, pMsg->term); + return true; +} + int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) { int32_t ret = 0; SyncRequestVote* pMsg = pRpcMsg->pCont; @@ -101,6 +127,8 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) { } bool logOK = syncNodeOnRequestVoteLogOK(ths, pMsg); + SyncIndex localAppliedIndex = SYNC_INDEX_INVALID; + bool appliedOK = syncNodeOnRequestVoteAppliedIndexOK(ths, pMsg, &localAppliedIndex); // maybe update term if (pMsg->term > raftStoreGetTerm(ths)) { syncNodeStepDown(ths, pMsg->term, pMsg->srcId, "requestVote-1"); @@ -109,11 +137,12 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) { if (!(pMsg->term <= currentTerm)) return TSDB_CODE_SYN_INTERNAL_ERROR; bool hasVoted = raftStoreHasVoted(ths); - bool grant = - (pMsg->term == currentTerm) && logOK && ((!hasVoted) || syncUtilSameId(&ths->raftStore.voteFor, &pMsg->srcId)); + bool grant = (pMsg->term == currentTerm) && logOK && appliedOK && + ((!hasVoted) || syncUtilSameId(&ths->raftStore.voteFor, &pMsg->srcId)); sInfo("vgId:%d, grant:%d, hasVoted:%d, voteFor:0x%" PRIx64 ", srcId:0x%" PRIx64 ", logOK:%d, msg term:%" PRId64 - ", current term:%" PRId64, - ths->vgId, grant, hasVoted, ths->raftStore.voteFor.addr, pMsg->srcId.addr, logOK, pMsg->term, currentTerm); + ", current term:%" PRId64 ", appliedOK:%d, my applied:%" PRId64 ", recv applied:%" PRId64, + ths->vgId, grant, hasVoted, ths->raftStore.voteFor.addr, pMsg->srcId.addr, logOK, pMsg->term, currentTerm, + appliedOK, localAppliedIndex, pMsg->candidateAppliedIndex); if (grant) { // maybe has already voted for pMsg->srcId // vote again, no harm diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 07b34e803231..eb4307f46b5b 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -733,17 +733,17 @@ void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, i snprintf(statusMsg, sizeof(statusMsg), "granted:%d", voteGranted); sNInfo(pSyncNode, "%s sync-request-vote from dnode:%d, {term:%" PRId64 ", last-index:%" PRId64 ", last-term:%" PRId64 - "}, %s, QID:0x%" PRIx64 ":0x%" PRIx64, - opt, DID(&pMsg->srcId), pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, + ", applied-index:%" PRId64 "}, %s, QID:0x%" PRIx64 ":0x%" PRIx64, + opt, DID(&pMsg->srcId), pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, pMsg->candidateAppliedIndex, (voteGranted != -1) ? statusMsg : errmsg, trace ? trace->rootId : 0, trace ? trace->msgId : 0); } void syncLogSendRequestVote(SSyncNode* pNode, const SyncRequestVote* pMsg, const char* s, const STraceId* trace) { sNInfo(pNode, "send sync-request-vote to dnode:%d {term:%" PRId64 ", last-index:%" PRId64 ", last-term:%" PRId64 - "}, %s, QID:0x%" PRIx64 ":0x%" PRIx64, - DID(&pMsg->destId), pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s, trace ? trace->rootId : 0, - trace ? trace->msgId : 0); + ", applied-index:%" PRId64 "}, %s, QID:0x%" PRIx64 ":0x%" PRIx64, + DID(&pMsg->destId), pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, pMsg->candidateAppliedIndex, s, + trace ? trace->rootId : 0, trace ? trace->msgId : 0); } void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s, diff --git a/source/libs/sync/test/syncElectTest.cpp b/source/libs/sync/test/syncElectTest.cpp index 7e134d89b96e..05e514f17e94 100644 --- a/source/libs/sync/test/syncElectTest.cpp +++ b/source/libs/sync/test/syncElectTest.cpp @@ -1,6 +1,34 @@ -#include +#include "syncElection.h" #include "syncTest.h" +extern "C" void syncUtilMsgNtoH(void* msg); + +namespace { + +constexpr int32_t kReplicaNum = 3; +constexpr int32_t kVgId = 1234; +constexpr int32_t kVnodeVersion = 1; +constexpr int32_t kElectIntervalMs = 60000; +constexpr int32_t kHeartbeatIntervalMs = 60000; +constexpr SyncIndex kSnapshotIndex = 0; +constexpr SyncTerm kSnapshotTerm = 1; + +uint16_t gPorts[kReplicaNum] = {7010, 7110, 7210}; +SyncIndex gAppliedIndexes[kReplicaNum] = {0, 1, 2}; + +struct ElectTestFsm { + SSyncFSM fsm; + SyncIndex appliedIndex; +}; + +struct ElectCluster { + SSyncNode* nodes[kReplicaNum]; + SWal* wals[kReplicaNum]; + char baseDir[TSDB_FILENAME_LEN]; +}; + +ElectCluster gCluster = {}; + void logTest() { sTrace("--- sync log test: trace"); sDebug("--- sync log test: debug"); @@ -10,20 +38,51 @@ void logTest() { sFatal("--- sync log test: fatal"); } -uint16_t gPorts[] = {7010, 7110, 7210, 7310, 7410}; -const char* gDir = "./syncElectTest"; -int32_t gVgId = 1234; +static SyncIndex electAppliedIndexCb(const SSyncFSM* pFsm) { return ((const ElectTestFsm*)pFsm)->appliedIndex; } + +static bool electApplyQueueEmptyCb(const SSyncFSM* pFsm) { + TAOS_UNUSED(pFsm); + return true; +} -void init() { - int code = walInit(); +static int32_t electApplyQueueItemsCb(const SSyncFSM* pFsm) { + TAOS_UNUSED(pFsm); + return 0; +} + +static int32_t electSnapshotInfoCb(const SSyncFSM* pFsm, SSnapshot* pSnapshot) { + TAOS_UNUSED(pFsm); + memset(pSnapshot, 0, sizeof(*pSnapshot)); + pSnapshot->state = SYNC_FSM_STATE_COMPLETE; + pSnapshot->lastApplyIndex = kSnapshotIndex; + pSnapshot->lastApplyTerm = kSnapshotTerm; + pSnapshot->lastConfigIndex = SYNC_INDEX_INVALID; + return 0; +} + +static ElectTestFsm* createElectFsm(SyncIndex appliedIndex) { + ElectTestFsm* pFsm = (ElectTestFsm*)taosMemoryCalloc(1, sizeof(ElectTestFsm)); + TD_ALWAYS_ASSERT(pFsm != NULL); + + pFsm->appliedIndex = appliedIndex; + pFsm->fsm.data = pFsm; + pFsm->fsm.FpAppliedIndexCb = electAppliedIndexCb; + pFsm->fsm.FpApplyQueueEmptyCb = electApplyQueueEmptyCb; + pFsm->fsm.FpApplyQueueItems = electApplyQueueItemsCb; + pFsm->fsm.FpGetSnapshotInfo = electSnapshotInfoCb; + return pFsm; +} + +static void initWalEnv() { + int32_t code = walInit(NULL); TD_ALWAYS_ASSERT(code == 0); } -void cleanup() { walCleanUp(); } +static void cleanupWalEnv() { walCleanUp(); } -SWal* createWal(char* path, int32_t vgId) { +static SWal* createWal(const char* path, int32_t vgId) { SWalCfg walCfg; - memset(&walCfg, 0, sizeof(SWalCfg)); + memset(&walCfg, 0, sizeof(walCfg)); walCfg.vgId = vgId; walCfg.fsyncPeriod = 1000; walCfg.retentionPeriod = 1000; @@ -31,90 +90,170 @@ SWal* createWal(char* path, int32_t vgId) { walCfg.retentionSize = 1000; walCfg.segSize = 1000; walCfg.level = TAOS_WAL_FSYNC; + SWal* pWal = walOpen(path, &walCfg); TD_ALWAYS_ASSERT(pWal != NULL); return pWal; } -SSyncNode* createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* pWal, char* path) { - SSyncInfo syncInfo; - syncInfo.vgId = vgId; - syncInfo.msgcb = &gSyncIO->msgcb; - syncInfo.syncSendMSg = syncIOSendMsg; - syncInfo.syncEqMsg = syncIOEqMsg; - syncInfo.pFsm = NULL; - snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex); - syncInfo.pWal = pWal; +static int32_t localEqMsg(const SMsgCb* msgcb, SRpcMsg* pMsg) { + TAOS_UNUSED(msgcb); + + if (pMsg->pCont != NULL) { + rpcFreeCont(pMsg->pCont); + pMsg->pCont = NULL; + } + + return 0; +} + +static SSyncNode* findNodeByPort(uint16_t port) { + for (int32_t i = 0; i < kReplicaNum; ++i) { + if (gCluster.nodes[i] != NULL && gCluster.nodes[i]->myNodeInfo.nodePort == port) { + return gCluster.nodes[i]; + } + } + + return NULL; +} + +static int32_t dispatchLocalMsg(SSyncNode* pNode, SRpcMsg* pMsg) { + switch (pMsg->msgType) { + case TDMT_SYNC_REQUEST_VOTE: + return syncNodeOnRequestVote(pNode, pMsg); + case TDMT_SYNC_REQUEST_VOTE_REPLY: + return syncNodeOnRequestVoteReply(pNode, pMsg); + default: + return 0; + } +} + +static int32_t localSendMsg(const SEpSet* pEpSet, SRpcMsg* pMsg) { + SSyncNode* pTarget = NULL; + int32_t code = 0; + + if (pEpSet != NULL && pEpSet->numOfEps > 0) { + pTarget = findNodeByPort(pEpSet->eps[0].port); + } + + syncUtilMsgNtoH(pMsg->pCont); + + if (pTarget != NULL) { + code = dispatchLocalMsg(pTarget, pMsg); + } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE || pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) { + code = -1; + } + + if (pMsg->pCont != NULL) { + rpcFreeCont(pMsg->pCont); + pMsg->pCont = NULL; + } + + return code; +} + +static SSyncNode* createSyncNode(int32_t myIndex, SyncIndex appliedIndex) { + SSyncInfo syncInfo = {}; + syncInfo.vgId = kVgId; + syncInfo.msgcb = NULL; + syncInfo.syncSendMSg = localSendMsg; + syncInfo.syncEqMsg = localEqMsg; + syncInfo.syncEqCtrlMsg = localEqMsg; + syncInfo.electMs = kElectIntervalMs; + syncInfo.heartbeatMs = kHeartbeatIntervalMs; + syncInfo.pFsm = &createElectFsm(appliedIndex)->fsm; + + snprintf(syncInfo.path, sizeof(syncInfo.path), "%s/node%d", gCluster.baseDir, myIndex); + + char walPath[TSDB_FILENAME_LEN] = {0}; + snprintf(walPath, sizeof(walPath), "%s/wal%d", gCluster.baseDir, myIndex); + gCluster.wals[myIndex] = createWal(walPath, kVgId); + syncInfo.pWal = gCluster.wals[myIndex]; SSyncCfg* pCfg = &syncInfo.syncCfg; pCfg->myIndex = myIndex; - pCfg->replicaNum = replicaNum; + pCfg->replicaNum = kReplicaNum; + pCfg->totalReplicaNum = kReplicaNum; + pCfg->changeVersion = kVnodeVersion; - for (int i = 0; i < replicaNum; ++i) { + for (int32_t i = 0; i < kReplicaNum; ++i) { pCfg->nodeInfo[i].nodePort = gPorts[i]; - taosGetFqdn(pCfg->nodeInfo[i].nodeFqdn); - // snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1"); + snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1"); } - SSyncNode* pSyncNode = syncNodeOpen(&syncInfo); + SSyncNode* pSyncNode = syncNodeOpen(&syncInfo, kVnodeVersion, kElectIntervalMs, kHeartbeatIntervalMs); TD_ALWAYS_ASSERT(pSyncNode != NULL); - // gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; - // gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; - // gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; - // gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; - // gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; - // gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; - // gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; - // gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; - // gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; - gSyncIO->pSyncNode = pSyncNode; + int32_t code = syncNodeStart(pSyncNode); + TD_ALWAYS_ASSERT(code == 0); - syncNodeStart(pSyncNode); + code = syncNodeStopPingTimer(pSyncNode); + TD_ALWAYS_ASSERT(code == 0); return pSyncNode; } -void usage(char* exe) { printf("usage: %s replicaNum myIndex \n", exe); } +static void initCluster() { + memset(&gCluster, 0, sizeof(gCluster)); + snprintf(gCluster.baseDir, sizeof(gCluster.baseDir), "/tmp/syncElectTest-%" PRId64, taosGetTimestampMs()); + taosRemoveDir(gCluster.baseDir); + TD_ALWAYS_ASSERT(taosMulMkDir(gCluster.baseDir) == 0); + + for (int32_t i = 0; i < kReplicaNum; ++i) { + gCluster.nodes[i] = createSyncNode(i, gAppliedIndexes[i]); + TD_ALWAYS_ASSERT(gCluster.nodes[i] != NULL); + TD_ALWAYS_ASSERT(syncNodeGetAppliedIndex(gCluster.nodes[i]) == gAppliedIndexes[i]); + } +} + +static void cleanupCluster() { + for (int32_t i = 0; i < kReplicaNum; ++i) { + if (gCluster.nodes[i] != NULL) { + syncNodeClose(gCluster.nodes[i]); + gCluster.nodes[i] = NULL; + } + + if (gCluster.wals[i] != NULL) { + walClose(gCluster.wals[i]); + gCluster.wals[i] = NULL; + } + } + + if (gCluster.baseDir[0] != '\0') { + taosRemoveDir(gCluster.baseDir); + } +} + +static void testAppliedIndexElectionPreference() { + SSyncNode* lowAppliedNode = gCluster.nodes[0]; + SSyncNode* highAppliedNode = gCluster.nodes[2]; + + TD_ALWAYS_ASSERT(syncNodeElect(lowAppliedNode) == 0); + TD_ALWAYS_ASSERT(lowAppliedNode->state == TAOS_SYNC_STATE_CANDIDATE); + TD_ALWAYS_ASSERT(highAppliedNode->state != TAOS_SYNC_STATE_LEADER); + + TD_ALWAYS_ASSERT(syncNodeElect(highAppliedNode) == 0); + TD_ALWAYS_ASSERT(highAppliedNode->state == TAOS_SYNC_STATE_LEADER); + TD_ALWAYS_ASSERT(lowAppliedNode->state != TAOS_SYNC_STATE_LEADER); +} + +} // namespace -int main(int argc, char** argv) { +int main() { tsAsyncLog = 0; sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE; - if (argc != 3) { - usage(argv[0]); - exit(-1); - } - int32_t replicaNum = atoi(argv[1]); - int32_t myIndex = atoi(argv[2]); - TD_ALWAYS_ASSERT(replicaNum >= 1 && replicaNum <= 5); - TD_ALWAYS_ASSERT(myIndex >= 0 && myIndex < replicaNum); + logTest(); + initWalEnv(); - init(); - int32_t ret = syncIOStart((char*)"127.0.0.1", gPorts[myIndex]); + int32_t ret = syncInit(); TD_ALWAYS_ASSERT(ret == 0); - ret = syncInit(); - TD_ALWAYS_ASSERT(ret == 0); - - char walPath[128]; - snprintf(walPath, sizeof(walPath), "%s_wal_replica%d_index%d", gDir, replicaNum, myIndex); - SWal* pWal = createWal(walPath, gVgId); - SSyncNode* pSyncNode = createSyncNode(replicaNum, myIndex, gVgId, pWal, (char*)gDir); - TD_ALWAYS_ASSERT(pSyncNode != NULL); - sNTrace(pSyncNode, "==syncElectTest=="); - - //--------------------------- - while (1) { - char* s = syncNode2SimpleStr(pSyncNode); - sTrace("%s", s); - taosMemoryFree(s); - taosMsleep(1000); - } + initCluster(); + testAppliedIndexElectionPreference(); + cleanupCluster(); - syncNodeClose(pSyncNode); - walClose(pWal); - syncIOStop(); - cleanup(); + syncCleanUp(); + cleanupWalEnv(); return 0; } diff --git a/source/libs/sync/test/syncRequestVoteTest.cpp b/source/libs/sync/test/syncRequestVoteTest.cpp index 275f7804bfb2..b6c7a320d498 100644 --- a/source/libs/sync/test/syncRequestVoteTest.cpp +++ b/source/libs/sync/test/syncRequestVoteTest.cpp @@ -1,6 +1,29 @@ -#include +#include "syncRequestVote.h" #include "syncTest.h" +static SyncIndex testAppliedIndexCb(const SSyncFSM *pFsm) { return *((SyncIndex *)pFsm->data); } + +static SSyncNode createAppliedIndexNode(SyncIndex localAppliedIndex) { + SSyncNode node = {0}; + SSyncFSM fsm = {0}; + + fsm.data = taosMemoryMalloc(sizeof(SyncIndex)); + *((SyncIndex *)fsm.data) = localAppliedIndex; + fsm.FpAppliedIndexCb = testAppliedIndexCb; + + node.pFsm = (SSyncFSM *)taosMemoryMalloc(sizeof(SSyncFSM)); + *(node.pFsm) = fsm; + return node; +} + +static void destroyAppliedIndexNode(SSyncNode *pNode) { + if (pNode->pFsm != NULL) { + taosMemoryFree(pNode->pFsm->data); + taosMemoryFree(pNode->pFsm); + pNode->pFsm = NULL; + } +} + void logTest() { sTrace("--- sync log test: trace"); sDebug("--- sync log test: debug"); @@ -11,7 +34,7 @@ void logTest() { } SyncRequestVote *createMsg() { - SyncRequestVote *pMsg = syncRequestVoteBuild(1000); + SyncRequestVote *pMsg = syncRequestVoteBuildWithAppliedIndex(1000, 44); pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234); pMsg->srcId.vgId = 100; pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678); @@ -35,6 +58,7 @@ void test2() { syncRequestVoteSerialize(pMsg, serialized, len); SyncRequestVote *pMsg2 = syncRequestVoteBuild(1000); syncRequestVoteDeserialize(serialized, len, pMsg2); + TD_ALWAYS_ASSERT(pMsg2->candidateAppliedIndex == pMsg->candidateAppliedIndex); syncRequestVoteLog2((char *)"test2: syncRequestVoteSerialize -> syncRequestVoteDeserialize ", pMsg2); taosMemoryFree(serialized); @@ -47,6 +71,7 @@ void test3() { uint32_t len; char *serialized = syncRequestVoteSerialize2(pMsg, &len); SyncRequestVote *pMsg2 = syncRequestVoteDeserialize2(serialized, len); + TD_ALWAYS_ASSERT(pMsg2->candidateAppliedIndex == pMsg->candidateAppliedIndex); syncRequestVoteLog2((char *)"test3: syncRequestVoteSerialize3 -> syncRequestVoteDeserialize2 ", pMsg2); taosMemoryFree(serialized); @@ -60,6 +85,7 @@ void test4() { syncRequestVote2RpcMsg(pMsg, &rpcMsg); SyncRequestVote *pMsg2 = syncRequestVoteBuild(1000); syncRequestVoteFromRpcMsg(&rpcMsg, pMsg2); + TD_ALWAYS_ASSERT(pMsg2->candidateAppliedIndex == pMsg->candidateAppliedIndex); syncRequestVoteLog2((char *)"test4: syncRequestVote2RpcMsg -> syncRequestVoteFromRpcMsg ", pMsg2); rpcFreeCont(rpcMsg.pCont); @@ -72,6 +98,7 @@ void test5() { SRpcMsg rpcMsg; syncRequestVote2RpcMsg(pMsg, &rpcMsg); SyncRequestVote *pMsg2 = syncRequestVoteFromRpcMsg2(&rpcMsg); + TD_ALWAYS_ASSERT(pMsg2->candidateAppliedIndex == pMsg->candidateAppliedIndex); syncRequestVoteLog2((char *)"test5: syncRequestVote2RpcMsg -> syncRequestVoteFromRpcMsg2 ", pMsg2); rpcFreeCont(rpcMsg.pCont); @@ -79,6 +106,58 @@ void test5() { syncRequestVoteDestroy(pMsg2); } +void test6() { + SSyncNode node = createAppliedIndexNode(44); + SyncRequestVote *pMsg = createMsg(); + SyncIndex localAppliedIndex = SYNC_INDEX_INVALID; + + pMsg->candidateAppliedIndex = 45; + TD_ALWAYS_ASSERT(syncNodeOnRequestVoteAppliedIndexOK(&node, pMsg, &localAppliedIndex)); + TD_ALWAYS_ASSERT(localAppliedIndex == 44); + + destroyAppliedIndexNode(&node); + syncRequestVoteDestroy(pMsg); +} + +void test7() { + SSyncNode node = createAppliedIndexNode(44); + SyncRequestVote *pMsg = createMsg(); + SyncIndex localAppliedIndex = SYNC_INDEX_INVALID; + + pMsg->candidateAppliedIndex = 44; + TD_ALWAYS_ASSERT(syncNodeOnRequestVoteAppliedIndexOK(&node, pMsg, &localAppliedIndex)); + TD_ALWAYS_ASSERT(localAppliedIndex == 44); + + destroyAppliedIndexNode(&node); + syncRequestVoteDestroy(pMsg); +} + +void test8() { + SSyncNode node = createAppliedIndexNode(44); + SyncRequestVote *pMsg = createMsg(); + SyncIndex localAppliedIndex = SYNC_INDEX_INVALID; + + pMsg->candidateAppliedIndex = 43; + TD_ALWAYS_ASSERT(!syncNodeOnRequestVoteAppliedIndexOK(&node, pMsg, &localAppliedIndex)); + TD_ALWAYS_ASSERT(localAppliedIndex == 44); + + destroyAppliedIndexNode(&node); + syncRequestVoteDestroy(pMsg); +} + +void test9() { + SSyncNode node = createAppliedIndexNode(SYNC_INDEX_INVALID); + SyncRequestVote *pMsg = createMsg(); + SyncIndex localAppliedIndex = SYNC_INDEX_INVALID; + + pMsg->candidateAppliedIndex = SYNC_INDEX_INVALID; + TD_ALWAYS_ASSERT(syncNodeOnRequestVoteAppliedIndexOK(&node, pMsg, &localAppliedIndex)); + TD_ALWAYS_ASSERT(localAppliedIndex == SYNC_INDEX_INVALID); + + destroyAppliedIndexNode(&node); + syncRequestVoteDestroy(pMsg); +} + int main() { tsAsyncLog = 0; sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE; @@ -89,6 +168,10 @@ int main() { test3(); test4(); test5(); + test6(); + test7(); + test8(); + test9(); return 0; } diff --git a/source/libs/sync/test/syncVotesGrantedTest.cpp b/source/libs/sync/test/syncVotesGrantedTest.cpp index c4d86a59c7ef..f3a051f11167 100644 --- a/source/libs/sync/test/syncVotesGrantedTest.cpp +++ b/source/libs/sync/test/syncVotesGrantedTest.cpp @@ -12,6 +12,8 @@ void logTest() { uint16_t ports[] = {7010, 7110, 7210, 7310, 7410}; int32_t replicaNum = 3; int32_t myIndex = 0; +int32_t electMs = 60000; +int32_t heartbeatMs = 60000; SRaftId ids[TSDB_MAX_REPLICA]; SSyncInfo syncInfo; @@ -36,7 +38,7 @@ SSyncNode* syncNodeInit() { // taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn); } - pSyncNode = syncNodeOpen(&syncInfo); + pSyncNode = syncNodeOpen(&syncInfo, 1, electMs, heartbeatMs); TD_ALWAYS_ASSERT(pSyncNode != NULL); // gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; @@ -63,6 +65,43 @@ void initRaftId(SSyncNode* pSyncNode) { } } +void testDeterministicTieReset(SVotesGranted* pVotesGranted, SyncTerm term) { + voteGrantedReset(pVotesGranted, term); + + for (int round = 0; round < 2; ++round) { + for (int i = 0; i < 2; ++i) { + SyncRequestVoteReply* reply = syncRequestVoteReplyBuild(1000); + reply->destId = pSyncNode->myRaftId; + reply->srcId = ids[i]; + reply->term = term; + reply->voteGranted = true; + voteGrantedVote(pVotesGranted, reply); + syncRequestVoteReplyDestroy(reply); + } + } + + char* first = voteGranted2Str(pVotesGranted); + TD_ALWAYS_ASSERT(first != NULL); + + voteGrantedReset(pVotesGranted, term); + for (int i = 0; i < 2; ++i) { + SyncRequestVoteReply* reply = syncRequestVoteReplyBuild(1000); + reply->destId = pSyncNode->myRaftId; + reply->srcId = ids[i]; + reply->term = term; + reply->voteGranted = true; + voteGrantedVote(pVotesGranted, reply); + syncRequestVoteReplyDestroy(reply); + } + + char* second = voteGranted2Str(pVotesGranted); + TD_ALWAYS_ASSERT(second != NULL); + TD_ALWAYS_ASSERT(strcmp(first, second) == 0); + + taosMemoryFree(first); + taosMemoryFree(second); +} + int main(int argc, char** argv) { tsAsyncLog = 0; sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE; @@ -141,6 +180,8 @@ int main(int argc, char** argv) { taosMemoryFree(serialized); } + testDeterministicTieReset(pVotesGranted, 20260410); + voteGrantedDestroy(pVotesGranted); return 0; } diff --git a/source/libs/sync/test/sync_test_lib/inc/syncTest.h b/source/libs/sync/test/sync_test_lib/inc/syncTest.h index fc52e83aa7c2..094f1af5536b 100644 --- a/source/libs/sync/test/sync_test_lib/inc/syncTest.h +++ b/source/libs/sync/test/sync_test_lib/inc/syncTest.h @@ -44,6 +44,8 @@ extern void addEpIntoEpSet(SEpSet* pEpSet, const char* fqdn, uint16_t port); typedef struct SyncPing SyncPing; typedef struct SyncPingReply SyncPingReply; +typedef void SRaftEntryHashCache; +typedef void SRaftEntryCache; typedef int32_t (*FpOnPingCb)(SSyncNode* ths, SyncPing* pMsg); typedef int32_t (*FpOnPingReplyCb)(SSyncNode* ths, SyncPingReply* pMsg); @@ -241,6 +243,7 @@ void syncClientRequestLog(const SyncClientRequest* pMsg); void syncClientRequestLog2(char* s, const SyncClientRequest* pMsg); SyncRequestVote* syncRequestVoteBuild(int32_t vgId); +SyncRequestVote* syncRequestVoteBuildWithAppliedIndex(int32_t vgId, SyncIndex appliedIndex); void syncRequestVoteDestroy(SyncRequestVote* pMsg); void syncRequestVoteSerialize(const SyncRequestVote* pMsg, char* buf, uint32_t bufLen); void syncRequestVoteDeserialize(const char* buf, uint32_t len, SyncRequestVote* pMsg); diff --git a/source/libs/sync/test/sync_test_lib/src/syncIO.c b/source/libs/sync/test/sync_test_lib/src/syncIO.c index 135d58721689..8ca5ede28560 100644 --- a/source/libs/sync/test/sync_test_lib/src/syncIO.c +++ b/source/libs/sync/test/sync_test_lib/src/syncIO.c @@ -23,6 +23,10 @@ #include "tutil.h" #include "tversion.h" +#ifndef TDMT_SYNC_PING +#define TDMT_SYNC_PING TDMT_SYNC_HEARTBEAT +#endif + bool gRaftDetailLog = false; SSyncIO *gSyncIO = NULL; @@ -97,8 +101,9 @@ int32_t syncIOEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { snprintf(logBuf, sizeof(logBuf), "==syncIOEqMsg== msgType:%d", pMsg->msgType); syncRpcMsgLog2(logBuf, pMsg); - SRpcMsg *pTemp; - pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM, 0); + SRpcMsg *pTemp = NULL; + int32_t code = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM, 0, (void **)&pTemp); + ASSERT(code == 0); memcpy(pTemp, pMsg, sizeof(SRpcMsg)); STaosQueue *pMsgQ = gSyncIO->pMsgQ; @@ -136,8 +141,10 @@ static SSyncIO *syncIOCreate(char *host, uint16_t port) { SSyncIO *io = (SSyncIO *)taosMemoryMalloc(sizeof(SSyncIO)); memset(io, 0, sizeof(*io)); - io->pMsgQ = taosOpenQueue(); - io->pQset = taosOpenQset(); + int32_t code = taosOpenQueue(&io->pMsgQ); + ASSERT(code == 0); + code = taosOpenQset(&io->pQset); + ASSERT(code == 0); taosAddIntoQset(io->pQset, io->pMsgQ, NULL); io->myAddr.inUse = 0; @@ -189,7 +196,7 @@ static int32_t syncIOStartInternal(SSyncIO *io) { rpcInit.idleTime = 100; rpcInit.user = "sync-io"; rpcInit.connType = TAOS_CONN_CLIENT; - taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); + taosVersionStrToInt(td_version, &(rpcInit.compatibilityVer)); io->clientRpc = rpcOpen(&rpcInit); if (io->clientRpc == NULL) { sError("failed to initialize RPC"); @@ -210,7 +217,7 @@ static int32_t syncIOStartInternal(SSyncIO *io) { rpcInit.idleTime = 2 * 1500; rpcInit.parent = io; rpcInit.connType = TAOS_CONN_SERVER; - taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); + taosVersionStrToInt(td_version, &(rpcInit.compatibilityVer)); void *pRpc = rpcOpen(&rpcInit); if (pRpc == NULL) { sError("failed to start RPC server"); @@ -245,7 +252,9 @@ static int32_t syncIOStopInternal(SSyncIO *io) { static void *syncIOConsumerFunc(void *param) { SSyncIO *io = param; - STaosQall *qall = taosAllocateQall(); + STaosQall *qall = NULL; + int32_t code = taosAllocateQall(&qall); + ASSERT(code == 0); SRpcMsg *pRpcMsg, rpcMsg; SQueueInfo qinfo = {0}; @@ -381,8 +390,9 @@ static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { syncRpcMsgLog2((char *)"==syncIOProcessRequest==", pMsg); SSyncIO *io = pParent; - SRpcMsg *pTemp; - pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM, 0); + SRpcMsg *pTemp = NULL; + int32_t code = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM, 0, (void **)&pTemp); + ASSERT(code == 0); memcpy(pTemp, pMsg, sizeof(SRpcMsg)); taosWriteQitem(io->pMsgQ, pTemp); } @@ -441,8 +451,9 @@ static void syncIOTickQ(void *param, void *tmrId) { SRpcMsg rpcMsg; syncPingReply2RpcMsg(pMsg, &rpcMsg); - SRpcMsg *pTemp; - pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM, 0); + SRpcMsg *pTemp = NULL; + int32_t code = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM, 0, (void **)&pTemp); + ASSERT(code == 0); memcpy(pTemp, &rpcMsg, sizeof(SRpcMsg)); syncRpcMsgLog2((char *)"==syncIOTickQ==", &rpcMsg); taosWriteQitem(io->pMsgQ, pTemp); diff --git a/source/libs/sync/test/sync_test_lib/src/syncMainDebug.c b/source/libs/sync/test/sync_test_lib/src/syncMainDebug.c index 18a75934fd6e..ae6658d85a5d 100644 --- a/source/libs/sync/test/sync_test_lib/src/syncMainDebug.c +++ b/source/libs/sync/test/sync_test_lib/src/syncMainDebug.c @@ -199,7 +199,7 @@ inline char* syncNode2SimpleStr(const SSyncNode* pSyncNode) { ", sby:%d, " "r-num:%d, " "lcfg:%" PRId64 ", chging:%d, rsto:%d", - pSyncNode->vgId, syncStr(pSyncNode->state), raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, + pSyncNode->vgId, syncStr(pSyncNode->state), raftStoreGetTerm((SSyncNode*)pSyncNode), pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, pSyncNode->raftCfg.isStandBy, pSyncNode->replicaNum, pSyncNode->raftCfg.lastConfigIndex, pSyncNode->changing, pSyncNode->restoreFinish); diff --git a/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c b/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c index 5f011ffe6902..2ba7a7031916 100644 --- a/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c +++ b/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c @@ -16,6 +16,18 @@ #define _DEFAULT_SOURCE #include "syncTest.h" +#ifndef TDMT_SYNC_PING +#define TDMT_SYNC_PING TDMT_SYNC_HEARTBEAT +#endif + +#ifndef TDMT_SYNC_PRE_SNAPSHOT +#define TDMT_SYNC_PRE_SNAPSHOT TDMT_SYNC_PREP_SNAPSHOT +#endif + +#ifndef TDMT_SYNC_PRE_SNAPSHOT_REPLY +#define TDMT_SYNC_PRE_SNAPSHOT_REPLY TDMT_SYNC_PREP_SNAPSHOT_REPLY +#endif + // ---- message process SyncPing---- SyncPing* syncPingBuild(uint32_t dataLen) { uint32_t bytes = sizeof(SyncPing) + dataLen; @@ -737,7 +749,7 @@ cJSON* syncSnapshotSend2Json(const SyncSnapshotSend* pMsg) { snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term); cJSON_AddStringToObject(pRoot, "term", u64buf); - snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->startTime); + snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->snapStartTime); cJSON_AddStringToObject(pRoot, "startTime", u64buf); snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->beginIndex); @@ -1018,6 +1030,13 @@ SyncRequestVote* syncRequestVoteBuild(int32_t vgId) { pMsg->bytes = bytes; pMsg->vgId = vgId; pMsg->msgType = TDMT_SYNC_REQUEST_VOTE; + pMsg->candidateAppliedIndex = SYNC_INDEX_INVALID; + return pMsg; +} + +SyncRequestVote* syncRequestVoteBuildWithAppliedIndex(int32_t vgId, SyncIndex appliedIndex) { + SyncRequestVote* pMsg = syncRequestVoteBuild(vgId); + pMsg->candidateAppliedIndex = appliedIndex; return pMsg; } @@ -1118,6 +1137,8 @@ cJSON* syncRequestVote2Json(const SyncRequestVote* pMsg) { cJSON_AddStringToObject(pRoot, "lastLogIndex", u64buf); snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->lastLogTerm); cJSON_AddStringToObject(pRoot, "lastLogTerm", u64buf); + snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->candidateAppliedIndex); + cJSON_AddStringToObject(pRoot, "candidateAppliedIndex", u64buf); } cJSON* pJson = cJSON_CreateObject(); diff --git a/source/libs/sync/test/sync_test_lib/src/syncRaftEntryDebug.c b/source/libs/sync/test/sync_test_lib/src/syncRaftEntryDebug.c index 8179b24d2964..74c8e160d97e 100644 --- a/source/libs/sync/test/sync_test_lib/src/syncRaftEntryDebug.c +++ b/source/libs/sync/test/sync_test_lib/src/syncRaftEntryDebug.c @@ -86,33 +86,12 @@ void syncEntryLog2(char* s, const SSyncRaftEntry* pObj) { //----------------------------------- cJSON* raftCache2Json(SRaftEntryHashCache* pCache) { - char u64buf[128] = {0}; + char ptrbuf[128] = {0}; cJSON* pRoot = cJSON_CreateObject(); if (pCache != NULL) { - taosThreadMutexLock(&pCache->mutex); - - snprintf(u64buf, sizeof(u64buf), "%p", pCache->pSyncNode); - cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf); - cJSON_AddNumberToObject(pRoot, "currentCount", pCache->currentCount); - cJSON_AddNumberToObject(pRoot, "maxCount", pCache->maxCount); - cJSON* pEntries = cJSON_CreateArray(); - cJSON_AddItemToObject(pRoot, "entries", pEntries); - - SSyncRaftEntry* pIter = (SSyncRaftEntry*)taosHashIterate(pCache->pEntryHash, NULL); - if (pIter != NULL) { - SSyncRaftEntry* pEntry = (SSyncRaftEntry*)pIter; - cJSON_AddItemToArray(pEntries, syncEntry2Json(pEntry)); - } - while (pIter) { - pIter = taosHashIterate(pCache->pEntryHash, pIter); - if (pIter != NULL) { - SSyncRaftEntry* pEntry = (SSyncRaftEntry*)pIter; - cJSON_AddItemToArray(pEntries, syncEntry2Json(pEntry)); - } - } - - taosThreadMutexUnlock(&pCache->mutex); + snprintf(ptrbuf, sizeof(ptrbuf), "%p", pCache); + cJSON_AddStringToObject(pRoot, "opaqueCache", ptrbuf); } cJSON* pJson = cJSON_CreateObject(); @@ -156,29 +135,12 @@ void raftCacheLog2(char* s, SRaftEntryHashCache* pCache) { } cJSON* raftEntryCache2Json(SRaftEntryCache* pCache) { - char u64buf[128] = {0}; + char ptrbuf[128] = {0}; cJSON* pRoot = cJSON_CreateObject(); if (pCache != NULL) { - taosThreadMutexLock(&pCache->mutex); - - snprintf(u64buf, sizeof(u64buf), "%p", pCache->pSyncNode); - cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf); - cJSON_AddNumberToObject(pRoot, "currentCount", pCache->currentCount); - cJSON_AddNumberToObject(pRoot, "maxCount", pCache->maxCount); - cJSON* pEntries = cJSON_CreateArray(); - cJSON_AddItemToObject(pRoot, "entries", pEntries); - - SSkipListIterator* pIter = tSkipListCreateIter(pCache->pSkipList); - while (tSkipListIterNext(pIter)) { - SSkipListNode* pNode = tSkipListIterGet(pIter); - ASSERT(pNode != NULL); - SSyncRaftEntry* pEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(pNode); - cJSON_AddItemToArray(pEntries, syncEntry2Json(pEntry)); - } - tSkipListDestroyIter(pIter); - - taosThreadMutexUnlock(&pCache->mutex); + snprintf(ptrbuf, sizeof(ptrbuf), "%p", pCache); + cJSON_AddStringToObject(pRoot, "opaqueCache", ptrbuf); } cJSON* pJson = cJSON_CreateObject(); diff --git a/source/libs/sync/test/sync_test_lib/src/syncSnapshotDebug.c b/source/libs/sync/test/sync_test_lib/src/syncSnapshotDebug.c index 2edcb0ad4dc4..25ba330901cd 100644 --- a/source/libs/sync/test/sync_test_lib/src/syncSnapshotDebug.c +++ b/source/libs/sync/test/sync_test_lib/src/syncSnapshotDebug.c @@ -28,19 +28,8 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) { snprintf(u64buf, sizeof(u64buf), "%p", pSender->pReader); cJSON_AddStringToObject(pRoot, "pReader", u64buf); - snprintf(u64buf, sizeof(u64buf), "%p", pSender->pCurrentBlock); - cJSON_AddStringToObject(pRoot, "pCurrentBlock", u64buf); - cJSON_AddNumberToObject(pRoot, "blockLen", pSender->blockLen); - - if (pSender->pCurrentBlock != NULL) { - char *s; - s = syncUtilPrintBin((char *)(pSender->pCurrentBlock), pSender->blockLen); - cJSON_AddStringToObject(pRoot, "pCurrentBlock", s); - taosMemoryFree(s); - s = syncUtilPrintBin2((char *)(pSender->pCurrentBlock), pSender->blockLen); - cJSON_AddStringToObject(pRoot, "pCurrentBlock2", s); - taosMemoryFree(s); - } + snprintf(u64buf, sizeof(u64buf), "%p", pSender->pSndBuf); + cJSON_AddStringToObject(pRoot, "pSndBuf", u64buf); cJSON *pSnapshot = cJSON_CreateObject(); snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSender->snapshot.lastApplyIndex); @@ -115,7 +104,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pReceiver->term); cJSON_AddStringToObject(pRoot, "term", u64buf); - snprintf(u64buf, sizeof(u64buf), "%" PRId64, pReceiver->startTime); + snprintf(u64buf, sizeof(u64buf), "%" PRId64, pReceiver->receiverStartTime); cJSON_AddStringToObject(pRoot, "startTime", u64buf); } diff --git a/tests/pytest/cluster/clusterSetup.py b/tests/pytest/cluster/clusterSetup.py index 809e0e9d25ed..8374d6a71369 100644 --- a/tests/pytest/cluster/clusterSetup.py +++ b/tests/pytest/cluster/clusterSetup.py @@ -148,6 +148,9 @@ def __init__(self): # self.tdnodes.append(Node(2, 'root', '51.141.167.23', 'node3', 'a', '/root/')) # self.tdnodes.append(Node(3, 'root', '52.247.207.173', 'node4', 'a', '/root/')) # self.tdnodes.append(Node(4, 'root', '51.141.166.100', 'node5', 'a', '/root/')) + self.node1 = self.tdnodes[0] if len(self.tdnodes) > 0 else None + self.node2 = self.tdnodes[1] if len(self.tdnodes) > 1 else None + self.node3 = self.tdnodes[2] if len(self.tdnodes) > 2 else None def stopOneNode(self, index): self.tdnodes[index].stopTaosd() diff --git a/tests/pytest/cluster/syncingTest.py b/tests/pytest/cluster/syncingTest.py index 96be048d231e..104b4d373b16 100644 --- a/tests/pytest/cluster/syncingTest.py +++ b/tests/pytest/cluster/syncingTest.py @@ -12,17 +12,48 @@ # -*- coding: utf-8 -*- import sys +import time from clusterSetup import * from util.sql import tdSql from util.log import tdLog import random class ClusterTestcase: + + def _has_three_nodes(self, nodes): + return len(nodes.tdnodes) >= 3 and nodes.node1 is not None and nodes.node2 is not None and nodes.node3 is not None + + def _run_lagging_node_regression(self, nodes, db_name): + tdLog.info("syncingTest lagging-node regression start") + + nodes.stopOneNode(2) + try: + for i in range(50): + tdSql.execute("create table if not exists lagging_%d using meters tags(1)" % i) + tdSql.execute("insert into lagging_%d values(now, %d)" % (i, i)) + finally: + nodes.startOneNode(2) + + deadline = time.time() + 30 + replica_ready = False + while time.time() < deadline: + tdSql.query("show %s.vgroups" % db_name) + if tdSql.queryRows > 0: + replica_ready = True + break + time.sleep(1) + + tdLog.info("syncingTest lagging-node regression recovered=%s" % replica_ready) + tdSql.query("select count(*) from meters") ## test case 24, 25, 26, 27 ## def run(self): + started_at = time.time() nodes = Nodes() + if nodes.node1 is None: + tdLog.exit("syncingTest requires at least one configured cluster node") + ctest = ClusterTest(nodes.node1.hostName) ctest.connectDB() ctest.createSTable(1) @@ -32,6 +63,8 @@ def run(self): tdSql.execute("use %s" % ctest.dbName) tdSql.execute("alter database %s replica 3" % ctest.dbName) + + tdLog.info("syncingTest baseline timing start=%s" % started_at) for i in range(100): tdSql.execute("drop table t%d" % i) @@ -41,7 +74,15 @@ def run(self): tdSql.execute("alter table meters add col col5 int") tdSql.execute("alter table meters drop col col5 int") + + if self._has_three_nodes(nodes): + self._run_lagging_node_regression(nodes, ctest.dbName) + else: + tdLog.info("syncingTest lagging-node regression skipped: fewer than 3 configured nodes") + tdSql.execute("drop database %s" % ctest.dbName) + elapsed = time.time() - started_at + tdLog.info("syncingTest elapsed_seconds=%.3f" % elapsed) tdSql.close() tdLog.success("%s successfully executed" % __file__)