-
Notifications
You must be signed in to change notification settings - Fork 1
refactor: Raft 日志规范化 + 增量持久化 + 单节点快速选举 #37
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,6 +3,7 @@ package Raft | |
| import ( | ||
| "encoding/binary" | ||
| "fmt" | ||
| "log/slog" | ||
| "math/rand" | ||
| "sync" | ||
| "time" | ||
|
|
@@ -95,7 +96,7 @@ func NewRaft(peers []string, me int) *Raft { | |
|
|
||
| // 从磁盘加载持久化状态(currentTerm, votedFor, log, snapshot metadata) | ||
| if err := r.readPersist(); err != nil { | ||
| fmt.Printf("[RAFT WARN] Failed to load persisted state: %v\n", err) | ||
| slog.Warn("failed to load persisted state", "error", err) | ||
| } | ||
|
|
||
| // 如果有快照,通知 FSM | ||
|
|
@@ -110,7 +111,7 @@ func NewRaft(peers []string, me int) *Raft { | |
| IsSnapshot: true, | ||
| }: | ||
| default: | ||
| fmt.Println("[WARN] ApplyCh is full during initialization, snapshot skipped") | ||
| slog.Warn("ApplyCh full during init, snapshot skipped") | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -122,7 +123,14 @@ func NewRaft(peers []string, me int) *Raft { | |
| return r | ||
| } | ||
|
|
||
| // persistLocked 持久化 Raft 状态(必须在持有锁的情况下调用) | ||
| // persistStateLocked 仅持久化 Term 和 votedFor(增量持久化,O(1)) | ||
| func (r *Raft) persistStateLocked() { | ||
| if err := r.wal.SaveState(int64(r.Term), int64(r.votedFor)); err != nil { | ||
| slog.Error("failed to persist state", "error", err) | ||
| } | ||
| } | ||
|
|
||
| // persistLocked 全量持久化 Raft 状态(仅用于日志冲突截断等特殊情况) | ||
| func (r *Raft) persistLocked() { | ||
| data := PersistData{ | ||
| CurrentTerm: int64(r.Term), | ||
|
|
@@ -133,7 +141,7 @@ func (r *Raft) persistLocked() { | |
| } | ||
|
|
||
| if err := r.wal.SavePersist(data); err != nil { | ||
| fmt.Printf("[RAFT ERROR] Failed to persist state: %v\n", err) | ||
| slog.Error("failed to persist state", "error", err) | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -188,18 +196,21 @@ func (r *Raft) startElection() { | |
| return | ||
| } | ||
|
|
||
| fmt.Printf("[RAFT] Starting election, current state=%v, Term=%d\n", r.state, r.Term) | ||
| slog.Info("election commenced", "term", r.Term) | ||
|
|
||
| r.state = Candidate | ||
| r.Term++ | ||
| r.votedFor = r.me | ||
| r.persistLocked() // 持久化 Term 和 votedFor | ||
| r.persistStateLocked() | ||
|
|
||
| lastLogIndex := int(r.LastIncludedIndex) | ||
| lastLogTerm := int(r.LastIncludedTerm) | ||
| lastLogIndex := -1 | ||
| lastLogTerm := 0 | ||
| if len(r.log) > 0 { | ||
| lastLogIndex = r.log[len(r.log)-1].Index | ||
| lastLogTerm = r.log[len(r.log)-1].Term | ||
| } else if r.LastIncludedIndex > 0 { | ||
| lastLogIndex = int(r.LastIncludedIndex) | ||
| lastLogTerm = int(r.LastIncludedTerm) | ||
|
Comment on lines
+206
to
+213
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Keep the new zero-based index convention consistent across Raft. After these changes, the first real log entry becomes index Also applies to: 306-310, 524-527, 540-545 🤖 Prompt for AI Agents |
||
| } | ||
|
|
||
| args := &RequestVoteArgs{ | ||
|
|
@@ -247,6 +258,16 @@ func (r *Raft) startElection() { | |
|
|
||
| r.mu.Unlock() | ||
|
|
||
| // 单节点模式:无需等待投票,直接成为 Leader | ||
| if peerCount == 0 { | ||
| r.mu.Lock() | ||
| if r.state == Candidate { | ||
| r.becomeLeader() | ||
| } | ||
| r.mu.Unlock() | ||
| return | ||
| } | ||
|
|
||
| // 等待投票结果或超时 | ||
| timeout := time.After(500 * time.Millisecond) | ||
| for j := 0; j < peerCount; j++ { | ||
|
|
@@ -278,21 +299,22 @@ func (r *Raft) startElection() { | |
| } | ||
|
|
||
| func (r *Raft) becomeLeader() { | ||
| fmt.Printf("[RAFT] Becoming Leader, Term=%d\n", r.Term) | ||
| slog.Info("leader elected", "term", r.Term) | ||
| r.state = Leader | ||
|
|
||
| // 计算下一个日志的绝对索引(考虑快照偏移) | ||
| nextLogIndex := int(r.LastIncludedIndex) + 1 | ||
| nextLogIndex := 0 | ||
| if len(r.log) > 0 { | ||
| nextLogIndex = r.log[len(r.log)-1].Index + 1 | ||
| } else if r.LastIncludedIndex > 0 { | ||
| nextLogIndex = int(r.LastIncludedIndex) + 1 | ||
| } | ||
|
|
||
| for i := range r.peers { | ||
| r.nextIndex[i] = nextLogIndex | ||
| r.matchIndex[i] = int(r.LastIncludedIndex) | ||
| } | ||
|
|
||
| fmt.Printf("[RAFT] Started heartbeat loop\n") | ||
| r.startHeartbeatLoop() | ||
| } | ||
|
|
||
|
|
@@ -472,10 +494,9 @@ func (r *Raft) checkSnapshotTrigger() { | |
| if logLength > threshold { | ||
| snapshotIndex := r.commitIndex - keepEntries | ||
| if snapshotIndex > r.lastSnapshotIndex { | ||
| fmt.Printf("[RAFT] Auto-triggering snapshot at index %d (log length=%d, threshold=%d)\n", | ||
| snapshotIndex, logLength, threshold) | ||
| // 异步调用避免持锁死锁(checkSnapshotTrigger 在持锁上下文中被调用) | ||
| go r.TakeSnapshot(snapshotIndex) | ||
| slog.Info("auto-triggering snapshot", "index", snapshotIndex, "logLen", logLength, "threshold", threshold) | ||
| // 异步调用避免持锁死锁(checkSnapshotTrigger 在持锁上下文中被调用) | ||
| go r.TakeSnapshot(snapshotIndex) | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -500,22 +521,27 @@ func (r *Raft) getLastLogIndex() int { | |
| if len(r.log) > 0 { | ||
| return r.log[len(r.log)-1].Index | ||
| } | ||
| return int(r.LastIncludedIndex) | ||
| if r.LastIncludedIndex > 0 { | ||
| return int(r.LastIncludedIndex) | ||
| } | ||
| return -1 | ||
| } | ||
|
|
||
| func (r *Raft) AppendEntry(command []byte) (int, error) { | ||
| r.mu.Lock() | ||
| defer r.mu.Unlock() | ||
|
|
||
| if r.state != Leader { | ||
| fmt.Printf("[RAFT] AppendEntry failed: not leader, state=%v\n", r.state) | ||
| slog.Warn("AppendEntry rejected, not leader", "state", r.state) | ||
| return -1, fmt.Errorf("not leader") | ||
| } | ||
|
|
||
| // 计算绝对索引(考虑快照偏移) | ||
| lastLogIndex := int(r.LastIncludedIndex) | ||
| lastLogIndex := -1 | ||
| if len(r.log) > 0 { | ||
| lastLogIndex = r.log[len(r.log)-1].Index | ||
| } else if r.LastIncludedIndex > 0 { | ||
| lastLogIndex = int(r.LastIncludedIndex) | ||
| } | ||
|
|
||
| entry := LogEntry{ | ||
|
|
@@ -524,7 +550,11 @@ func (r *Raft) AppendEntry(command []byte) (int, error) { | |
| Command: command, | ||
| } | ||
| r.log = append(r.log, entry) | ||
| r.persistLocked() | ||
|
|
||
| // 增量持久化:仅追加一条日志 | ||
| if err := r.wal.AppendLog(entry); err != nil { | ||
| slog.Error("failed to append log", "error", err) | ||
| } | ||
|
|
||
| // 单节点模式:立即提交 | ||
| if len(r.peers) == 1 { | ||
|
|
@@ -736,14 +766,14 @@ func (r *Raft) TakeSnapshot(index int) error { | |
| } | ||
| select { | ||
| case r.ApplyCh <- snapshotEntry: | ||
| fmt.Printf("[RAFT] Snapshot replay sent to FSM: Index=%d, entries=%d\n", index, len(snapshotEntries)) | ||
| slog.Info("snapshot replay sent to FSM", "index", index, "entries", len(snapshotEntries)) | ||
| default: | ||
| fmt.Println("[WARN] ApplyCh is full, snapshot replay skipped") | ||
| slog.Warn("ApplyCh full, snapshot replay skipped") | ||
| } | ||
| } | ||
|
|
||
| // 7. 持久化状态 | ||
| r.persistLocked() | ||
| // 7. 持久化状态(日志已由 TruncateLogs 处理) | ||
| r.persistStateLocked() | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Persist every
Term/votedFormutation, not just election start.This helper is a good optimization, but the higher-term reply paths still update
TermandvotedForwithout calling it. A crash after one of those downgrades can restart the node with a stale term/vote.🤖 Prompt for AI Agents