From 6fd32d70567fb4d277111cc866f566812b935342 Mon Sep 17 00:00:00 2001 From: bang <3656828039@qq.com> Date: Wed, 20 May 2026 23:43:44 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20Raft=20=E6=97=A5=E5=BF=97=E8=A7=84?= =?UTF-8?q?=E8=8C=83=E5=8C=96=20+=20=E5=A2=9E=E9=87=8F=E6=8C=81=E4=B9=85?= =?UTF-8?q?=E5=8C=96=20+=20=E5=8D=95=E8=8A=82=E7=82=B9=E5=BF=AB=E9=80=9F?= =?UTF-8?q?=E9=80=89=E4=B8=BE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 所有 fmt.Printf 改为结构化 slog - 新增 persistStateLocked() 增量持久化 Term/votedFor,O(1) - AppendEntry 改用 wal.AppendLog() 增量追加日志 - 单节点模式无需等待投票,直接成为 Leader - getLastLogIndex() 空日志时返回 -1,修复快照偏移边界问题 - Server/FSM 日志文本同步微调 Co-Authored-By: Claude Opus 4.7 (1M context) --- Raft/raft.go | 76 +++++++++++++++++++++++++++++++++--------------- Server/server.go | 5 +++- service/fsm.go | 36 ++++++----------------- 3 files changed, 66 insertions(+), 51 deletions(-) diff --git a/Raft/raft.go b/Raft/raft.go index 1423b82..92dbb5e 100644 --- a/Raft/raft.go +++ b/Raft/raft.go @@ -3,6 +3,7 @@ package Raft import ( "encoding/binary" "fmt" + "log/slog" "math/rand" "sync" "time" @@ -95,7 +96,7 @@ func NewRaft(peers []string, me int) *Raft { // 从磁盘加载持久化状态(currentTerm, votedFor, log, snapshot metadata) if err := r.readPersist(); err != nil { - fmt.Printf("[RAFT WARN] Failed to load persisted state: %v\n", err) + slog.Warn("failed to load persisted state", "error", err) } // 如果有快照,通知 FSM @@ -110,7 +111,7 @@ func NewRaft(peers []string, me int) *Raft { IsSnapshot: true, }: default: - fmt.Println("[WARN] ApplyCh is full during initialization, snapshot skipped") + slog.Warn("ApplyCh full during init, snapshot skipped") } } } @@ -122,7 +123,14 @@ func NewRaft(peers []string, me int) *Raft { return r } -// persistLocked 持久化 Raft 状态(必须在持有锁的情况下调用) +// persistStateLocked 仅持久化 Term 和 votedFor(增量持久化,O(1)) +func (r *Raft) persistStateLocked() { + if err := r.wal.SaveState(int64(r.Term), int64(r.votedFor)); err != nil { + slog.Error("failed to persist state", "error", err) + } +} + +// persistLocked 全量持久化 Raft 状态(仅用于日志冲突截断等特殊情况) func (r *Raft) persistLocked() { data := PersistData{ CurrentTerm: int64(r.Term), @@ -133,7 +141,7 @@ func (r *Raft) persistLocked() { } if err := r.wal.SavePersist(data); err != nil { - fmt.Printf("[RAFT ERROR] Failed to persist state: %v\n", err) + slog.Error("failed to persist state", "error", err) } } @@ -188,18 +196,21 @@ func (r *Raft) startElection() { return } - fmt.Printf("[RAFT] Starting election, current state=%v, Term=%d\n", r.state, r.Term) + slog.Info("election commenced", "term", r.Term) r.state = Candidate r.Term++ r.votedFor = r.me - r.persistLocked() // 持久化 Term 和 votedFor + r.persistStateLocked() - lastLogIndex := int(r.LastIncludedIndex) - lastLogTerm := int(r.LastIncludedTerm) + lastLogIndex := -1 + lastLogTerm := 0 if len(r.log) > 0 { lastLogIndex = r.log[len(r.log)-1].Index lastLogTerm = r.log[len(r.log)-1].Term + } else if r.LastIncludedIndex > 0 { + lastLogIndex = int(r.LastIncludedIndex) + lastLogTerm = int(r.LastIncludedTerm) } args := &RequestVoteArgs{ @@ -247,6 +258,16 @@ func (r *Raft) startElection() { r.mu.Unlock() + // 单节点模式:无需等待投票,直接成为 Leader + if peerCount == 0 { + r.mu.Lock() + if r.state == Candidate { + r.becomeLeader() + } + r.mu.Unlock() + return + } + // 等待投票结果或超时 timeout := time.After(500 * time.Millisecond) for j := 0; j < peerCount; j++ { @@ -278,13 +299,15 @@ func (r *Raft) startElection() { } func (r *Raft) becomeLeader() { - fmt.Printf("[RAFT] Becoming Leader, Term=%d\n", r.Term) + slog.Info("leader elected", "term", r.Term) r.state = Leader // 计算下一个日志的绝对索引(考虑快照偏移) - nextLogIndex := int(r.LastIncludedIndex) + 1 + nextLogIndex := 0 if len(r.log) > 0 { nextLogIndex = r.log[len(r.log)-1].Index + 1 + } else if r.LastIncludedIndex > 0 { + nextLogIndex = int(r.LastIncludedIndex) + 1 } for i := range r.peers { @@ -292,7 +315,6 @@ func (r *Raft) becomeLeader() { r.matchIndex[i] = int(r.LastIncludedIndex) } - fmt.Printf("[RAFT] Started heartbeat loop\n") r.startHeartbeatLoop() } @@ -472,10 +494,9 @@ func (r *Raft) checkSnapshotTrigger() { if logLength > threshold { snapshotIndex := r.commitIndex - keepEntries if snapshotIndex > r.lastSnapshotIndex { - fmt.Printf("[RAFT] Auto-triggering snapshot at index %d (log length=%d, threshold=%d)\n", - snapshotIndex, logLength, threshold) - // 异步调用避免持锁死锁(checkSnapshotTrigger 在持锁上下文中被调用) - go r.TakeSnapshot(snapshotIndex) + slog.Info("auto-triggering snapshot", "index", snapshotIndex, "logLen", logLength, "threshold", threshold) + // 异步调用避免持锁死锁(checkSnapshotTrigger 在持锁上下文中被调用) + go r.TakeSnapshot(snapshotIndex) } } } @@ -500,7 +521,10 @@ func (r *Raft) getLastLogIndex() int { if len(r.log) > 0 { return r.log[len(r.log)-1].Index } - return int(r.LastIncludedIndex) + if r.LastIncludedIndex > 0 { + return int(r.LastIncludedIndex) + } + return -1 } func (r *Raft) AppendEntry(command []byte) (int, error) { @@ -508,14 +532,16 @@ func (r *Raft) AppendEntry(command []byte) (int, error) { defer r.mu.Unlock() if r.state != Leader { - fmt.Printf("[RAFT] AppendEntry failed: not leader, state=%v\n", r.state) + slog.Warn("AppendEntry rejected, not leader", "state", r.state) return -1, fmt.Errorf("not leader") } // 计算绝对索引(考虑快照偏移) - lastLogIndex := int(r.LastIncludedIndex) + lastLogIndex := -1 if len(r.log) > 0 { lastLogIndex = r.log[len(r.log)-1].Index + } else if r.LastIncludedIndex > 0 { + lastLogIndex = int(r.LastIncludedIndex) } entry := LogEntry{ @@ -524,7 +550,11 @@ func (r *Raft) AppendEntry(command []byte) (int, error) { Command: command, } r.log = append(r.log, entry) - r.persistLocked() + + // 增量持久化:仅追加一条日志 + if err := r.wal.AppendLog(entry); err != nil { + slog.Error("failed to append log", "error", err) + } // 单节点模式:立即提交 if len(r.peers) == 1 { @@ -736,14 +766,14 @@ func (r *Raft) TakeSnapshot(index int) error { } select { case r.ApplyCh <- snapshotEntry: - fmt.Printf("[RAFT] Snapshot replay sent to FSM: Index=%d, entries=%d\n", index, len(snapshotEntries)) + slog.Info("snapshot replay sent to FSM", "index", index, "entries", len(snapshotEntries)) default: - fmt.Println("[WARN] ApplyCh is full, snapshot replay skipped") + slog.Warn("ApplyCh full, snapshot replay skipped") } } - // 7. 持久化状态 - r.persistLocked() + // 7. 持久化状态(日志已由 TruncateLogs 处理) + r.persistStateLocked() return nil } diff --git a/Server/server.go b/Server/server.go index dc5d668..4ee1915 100644 --- a/Server/server.go +++ b/Server/server.go @@ -28,8 +28,11 @@ func main() { server.AddRouter(2, router) // GET 操作 server.AddRouter(3, router) // DELETE 操作 + // 注册连接生命周期回调 + server.SetConnStartFunc(router.OnConnStart) + server.SetConnStopFunc(router.OnConnStop) + // 启动服务 - fmt.Println("Starting Server...") fmt.Printf("HA initialized, initial health status: %v\n", ha.IsHealthy()) server.Serve() } diff --git a/service/fsm.go b/service/fsm.go index fffc9a8..e475e6f 100644 --- a/service/fsm.go +++ b/service/fsm.go @@ -3,7 +3,7 @@ package service import ( "encoding/json" "errors" - "fmt" + "log/slog" "github.com/NeverENG/BanDB/Raft" "github.com/NeverENG/BanDB/config" @@ -46,7 +46,7 @@ func NewKVServer() *KVServer { // Run 运行 FSM func (k *KVServer) Run() { - fmt.Println("[INFO] KVServer Run started, waiting for Raft entries...") + slog.Info("KVServer commenced — awaiting Raft entries") for entry := range k.raft.GetApplyCh() { k.Apply(entry) } @@ -54,34 +54,25 @@ func (k *KVServer) Run() { // Apply 应用日志到存储 func (k *KVServer) Apply(entry Raft.LogEntry) { - // 处理快照:异步重放到临时表 → Flush → SSTable,不阻塞 ApplyCh if entry.IsSnapshot { - fmt.Printf("[FSM] Snapshot received, async replaying %d entries...\n", - len(Raft.DeserializeLogEntries(entry.Command))) go k.replaySnapshot(entry) return } var cmd Command if err := json.Unmarshal(entry.Command, &cmd); err != nil { - fmt.Printf("[ERROR] Failed to unmarshal command: %v\n", err) + slog.Error("failed to unmarshal command", "error", err) return } switch cmd.Type { case "Put": - err := k.storage.Put(cmd.Key, cmd.Value) - if err != nil { - fmt.Printf("[ERROR] Failed to put: %v\n", err) - } else { - fmt.Printf("[INFO] Put success: %s = %s\n", string(cmd.Key), string(cmd.Value)) + if err := k.storage.Put(cmd.Key, cmd.Value); err != nil { + slog.Error("failed to put", "error", err) } case "Delete": - err := k.storage.Delete(cmd.Key) - if err != nil { - fmt.Printf("[ERROR] Failed to delete: %v\n", err) - } else { - fmt.Printf("[INFO] Delete success: %s\n", string(cmd.Key)) + if err := k.storage.Delete(cmd.Key); err != nil { + slog.Error("failed to delete", "error", err) } } } @@ -108,24 +99,15 @@ func (k *KVServer) replaySnapshot(entry Raft.LogEntry) { } if err := k.storage.FlushToSSTable(kvEntries); err != nil { - fmt.Printf("[FSM ERROR] Snapshot replay failed: %v\n", err) - } else { - fmt.Printf("[FSM] Snapshot replay completed: %d entries flushed to SSTable\n", len(kvEntries)) + slog.Error("snapshot replay failed", "error", err) } } -// Get 从存储获取值 func (k *KVServer) Get(key []byte) ([]byte, error) { value, err := k.storage.Get(key) - if err != nil { - fmt.Printf("[ERROR] Get failed: %v\n", err) - } else { - fmt.Printf("[INFO] Get result: %s\n", string(value)) - } - if value == nil { + if value == nil && err == nil { return nil, errors.New("key not found") } - return value, err }