diff --git a/Raft/raft_wal.go b/Raft/raft_wal.go index 32e1fd5..b2abaec 100644 --- a/Raft/raft_wal.go +++ b/Raft/raft_wal.go @@ -4,6 +4,7 @@ import ( "encoding/binary" "errors" "fmt" + "io" "os" "path/filepath" "strconv" @@ -167,7 +168,7 @@ func (w *RaftWAL) LoadLogs() ([]LogEntry, error) { break } cmd := make([]byte, cmdLen) - if _, err := f.Read(cmd); err != nil { + if _, err := io.ReadFull(f, cmd); err != nil { break } @@ -182,11 +183,15 @@ func (w *RaftWAL) LoadLogs() ([]LogEntry, error) { } func (w *RaftWAL) Clear() error { - w.Close() - if err := os.Remove(w.logPath); err != nil { + if err := w.Close(); err != nil { + return err + } + w.file = nil + + if err := os.Remove(w.logPath); err != nil && !os.IsNotExist(err) { return err } - if err := os.Remove(w.metaPath); err != nil { + if err := os.Remove(w.metaPath); err != nil && !os.IsNotExist(err) { return err } diff --git a/storage/zstorage/WAL.go b/storage/zstorage/WAL.go index f4f224e..491fa39 100644 --- a/storage/zstorage/WAL.go +++ b/storage/zstorage/WAL.go @@ -7,34 +7,36 @@ import ( "io" "log/slog" "os" + "sync" "github.com/NeverENG/BanDB/config" "github.com/NeverENG/BanDB/storage/istorage" ) -const HEADER_LENGTH = 12 +const headerLength = 12 var _ istorage.IWal = &WAL{} type WAL struct { + mu sync.Mutex file *os.File - headerBuf [HEADER_LENGTH]byte + headerBuf [headerLength]byte } func NewWAL() *WAL { file, err := os.OpenFile(config.G.WALPath, os.O_APPEND|os.O_RDWR|os.O_CREATE, 0644) if err != nil { - slog.Error("[ERROR]:OPEN WAL LOG ERROR !", "path", config.G.WALPath, "error", err) - slog.Warn("[WARN]:WAL DISABLED, DATA WILL NOT BE PERSISTED") - // 返回一个空的WAL,不报错 + slog.Warn("cannot open WAL, running in disabled mode", "path", config.G.WALPath, "error", err) return &WAL{file: nil} } - slog.Info("[INFO]:WAL OPENED SUCCESSFULLY", "path", config.G.WALPath) + slog.Info("WAL opened", "path", config.G.WALPath) return &WAL{file: file} } func (w *WAL) Write(entry istorage.LogEntry) error { - // 如果 file is nil,跳过写入(WAL 禁用模式 + w.mu.Lock() + defer w.mu.Unlock() + if w.file == nil { return nil } @@ -48,28 +50,26 @@ func (w *WAL) Write(entry istorage.LogEntry) error { binary.BigEndian.PutUint32(w.headerBuf[4:], uint32(len(entry.Key))) binary.BigEndian.PutUint32(w.headerBuf[8:], uint32(len(entry.Value))) - _, err := w.file.Write(w.headerBuf[:]) - if err != nil { - slog.Error("[ERROR]:WRITE WAL LOG ERROR !") + if _, err := w.file.Write(w.headerBuf[:]); err != nil { + slog.Error("write WAL header failed", "error", err) return err } - - _, err = w.file.Write(entry.Key) - if err != nil { - slog.Error("[ERROR]:WRITE WAL LOG ERROR !") + if _, err := w.file.Write(entry.Key); err != nil { + slog.Error("write WAL key failed", "error", err) return err } - _, err = w.file.Write(entry.Value) - if err != nil { - slog.Error("[ERROR]:WRITE WAL LOG ERROR !") + if _, err := w.file.Write(entry.Value); err != nil { + slog.Error("write WAL value failed", "error", err) return err } - return w.Sync() + return w.file.Sync() } func (w *WAL) Read() ([]istorage.LogEntry, error) { - // 如果 file is nil,跳过读 + w.mu.Lock() + defer w.mu.Unlock() + if w.file == nil { return nil, nil } @@ -81,10 +81,10 @@ func (w *WAL) Read() ([]istorage.LogEntry, error) { entries := make([]istorage.LogEntry, 0) for { - header := make([]byte, HEADER_LENGTH) - _, err := w.file.Read(header) + header := make([]byte, headerLength) + _, err := io.ReadFull(w.file, header) if err != nil { - if errors.Is(err, io.EOF) || errors.Is(err, os.ErrClosed) { + if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, os.ErrClosed) { break } return entries, err @@ -95,18 +95,16 @@ func (w *WAL) Read() ([]istorage.LogEntry, error) { valueLen := binary.BigEndian.Uint32(header[8:]) key := make([]byte, keyLen) - _, err = w.file.Read(key) - if err != nil { - if errors.Is(err, io.EOF) { + if _, err := io.ReadFull(w.file, key); err != nil { + if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) { break } return entries, err } value := make([]byte, valueLen) - _, err = w.file.Read(value) - if err != nil { - if errors.Is(err, io.EOF) { + if _, err := io.ReadFull(w.file, value); err != nil { + if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) { break } return entries, err @@ -116,7 +114,7 @@ func (w *WAL) Read() ([]istorage.LogEntry, error) { hasher.Write(key) hasher.Write(value) if crc != hasher.Sum32() { - slog.Error("[ERROR]:THE DATA ERROR !") + slog.Error("WAL data corruption detected") return entries, errors.New("data corruption detected") } @@ -127,35 +125,58 @@ func (w *WAL) Read() ([]istorage.LogEntry, error) { } func (w *WAL) Close() error { + w.mu.Lock() + defer w.mu.Unlock() + if w.file == nil { return nil } return w.file.Close() } + func (w *WAL) Sync() error { + w.mu.Lock() + defer w.mu.Unlock() + if w.file == nil { return nil } return w.file.Sync() } -// 采用日志滚动的模式来启动 Clear func (w *WAL) Clear() error { + w.mu.Lock() + defer w.mu.Unlock() + + if w.file == nil { + // WAL disabled mode: try to remove old file by path, then reopen + if err := os.Remove(config.G.WALPath); err != nil && !os.IsNotExist(err) { + return err + } + f, err := os.OpenFile(config.G.WALPath, os.O_APPEND|os.O_RDWR|os.O_CREATE, 0644) + if err != nil { + slog.Warn("WAL disabled, cannot reopen after clear", "error", err) + return nil + } + w.file = f + return nil + } - if err := w.Close(); err != nil { - slog.Error("[ERROR]:CLOSE WAL LOG ERROR !") + path := w.file.Name() + if err := w.file.Close(); err != nil { + slog.Error("close WAL before clear failed", "error", err) return err } - if err := os.Remove(w.file.Name()); err != nil { + if err := os.Remove(path); err != nil && !os.IsNotExist(err) { return err } - f, err := os.OpenFile(w.file.Name(), os.O_APPEND|os.O_RDWR|os.O_CREATE, 0644) + f, err := os.OpenFile(path, os.O_APPEND|os.O_RDWR|os.O_CREATE, 0644) if err != nil { - slog.Error("[ERROR]:OPEN WAL LOG ERROR !") + slog.Error("reopen WAL after clear failed", "error", err) return err } w.file = f - return w.Sync() + return w.file.Sync() }